ckb_sync/types/
mod.rs

1use crate::{
2    FAST_INDEX, LOW_INDEX, NORMAL_INDEX, Status, StatusCode, TIME_TRACE_SIZE,
3    utils::async_send_message,
4};
5use ckb_app_config::SyncConfig;
6#[cfg(test)]
7use ckb_chain::VerifyResult;
8use ckb_chain::{ChainController, RemoteBlock};
9use ckb_chain_spec::consensus::{Consensus, MAX_BLOCK_INTERVAL, MIN_BLOCK_INTERVAL};
10use ckb_channel::Receiver;
11use ckb_constant::sync::{
12    BLOCK_DOWNLOAD_TIMEOUT, HEADERS_DOWNLOAD_HEADERS_PER_SECOND, HEADERS_DOWNLOAD_INSPECT_WINDOW,
13    HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
14    MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
15    MAX_UNKNOWN_TX_HASHES_SIZE, MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER, POW_INTERVAL,
16    RETRY_ASK_TX_TIMEOUT_INCREASE, SUSPEND_SYNC_TIME,
17};
18use ckb_logger::{debug, error, info, trace, warn};
19use ckb_network::{CKBProtocolContext, PeerIndex, SupportProtocols};
20use ckb_shared::{
21    Snapshot,
22    block_status::BlockStatus,
23    shared::Shared,
24    types::{HeaderIndex, HeaderIndexView, SHRINK_THRESHOLD},
25};
26use ckb_store::{ChainDB, ChainStore};
27use ckb_systemtime::unix_time_as_millis;
28use ckb_traits::{HeaderFields, HeaderFieldsProvider};
29use ckb_tx_pool::service::TxVerificationResult;
30use ckb_types::BlockNumberAndHash;
31use ckb_types::{
32    U256,
33    core::{self, BlockNumber, EpochExt},
34    packed::{self, Byte32},
35    prelude::*,
36};
37use ckb_util::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, shrink_to_fit};
38use dashmap::{self, DashMap};
39use keyed_priority_queue::{self, KeyedPriorityQueue};
40use lru::LruCache;
41use std::collections::{BTreeMap, HashMap, HashSet, btree_map::Entry};
42use std::hash::Hash;
43use std::sync::Arc;
44use std::sync::atomic::{AtomicUsize, Ordering};
45use std::time::{Duration, Instant};
46use std::{cmp, fmt, iter};
47
48const GET_HEADERS_CACHE_SIZE: usize = 10000;
49// TODO: Need discussed
50const GET_HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
51const FILTER_SIZE: usize = 50000;
52// 2 ** 13 < 6 * 1800 < 2 ** 14
53const ONE_DAY_BLOCK_NUMBER: u64 = 8192;
54pub(crate) const FILTER_TTL: u64 = 4 * 60 * 60;
55
56// State used to enforce CHAIN_SYNC_TIMEOUT
57// Only in effect for connections that are outbound, non-manual,
58// non-protected and non-whitelist.
59// Algorithm: if a peer's best known block has less work than our tip,
60// set a timeout CHAIN_SYNC_TIMEOUT seconds in the future:
61//   - If at timeout their best known block now has more work than our tip
62//     when the timeout was set, then either reset the timeout or clear it
63//     (after comparing against our current tip's work)
64//   - If at timeout their best known block still has less work than our
65//     tip did when the timeout was set, then send a getheaders message,
66//     and set a shorter timeout, HEADERS_RESPONSE_TIME seconds in future.
67//     If their best known block is still behind when that new timeout is
68//     reached, disconnect.
69
70#[derive(Clone, Debug, Default)]
71pub struct ChainSyncState {
72    pub timeout: u64,
73    pub work_header: Option<core::HeaderView>,
74    pub total_difficulty: Option<U256>,
75    pub sent_getheaders: bool,
76    headers_sync_state: HeadersSyncState,
77}
78
79impl ChainSyncState {
80    fn can_start_sync(&self, now: u64) -> bool {
81        match self.headers_sync_state {
82            HeadersSyncState::Initialized => false,
83            HeadersSyncState::SyncProtocolConnected => true,
84            HeadersSyncState::Started => false,
85            HeadersSyncState::Suspend(until) | HeadersSyncState::TipSynced(until) => until < now,
86        }
87    }
88
89    fn connected(&mut self) {
90        self.headers_sync_state = HeadersSyncState::SyncProtocolConnected;
91    }
92
93    fn start(&mut self) {
94        self.headers_sync_state = HeadersSyncState::Started
95    }
96
97    fn suspend(&mut self, until: u64) {
98        self.headers_sync_state = HeadersSyncState::Suspend(until)
99    }
100
101    fn tip_synced(&mut self) {
102        let now = unix_time_as_millis();
103        let avg_interval = (MAX_BLOCK_INTERVAL + MIN_BLOCK_INTERVAL) / 2;
104        self.headers_sync_state = HeadersSyncState::TipSynced(now + avg_interval * 1000);
105    }
106
107    fn started(&self) -> bool {
108        matches!(self.headers_sync_state, HeadersSyncState::Started)
109    }
110
111    fn started_or_tip_synced(&self) -> bool {
112        matches!(
113            self.headers_sync_state,
114            HeadersSyncState::Started | HeadersSyncState::TipSynced(_)
115        )
116    }
117}
118
119#[derive(Default, Clone, Debug)]
120enum HeadersSyncState {
121    #[default]
122    Initialized,
123    SyncProtocolConnected,
124    Started,
125    Suspend(u64), // suspend headers sync until this timestamp (milliseconds since unix epoch)
126    TipSynced(u64), // already synced to the end, not as the sync target for the time being, until the pause time is exceeded
127}
128
129#[derive(Clone, Default, Debug, Copy)]
130pub struct PeerFlags {
131    pub is_outbound: bool,
132    pub is_protect: bool,
133    pub is_whitelist: bool,
134    pub is_2023edition: bool,
135}
136
137#[derive(Clone, Default, Debug, Copy)]
138pub struct HeadersSyncController {
139    // The timestamp when sync started
140    pub(crate) started_ts: u64,
141    // The timestamp of better tip header when sync started
142    pub(crate) started_tip_ts: u64,
143
144    // The timestamp when the process last updated
145    pub(crate) last_updated_ts: u64,
146    // The timestamp of better tip header when the process last updated
147    pub(crate) last_updated_tip_ts: u64,
148
149    pub(crate) is_close_to_the_end: bool,
150}
151
152impl HeadersSyncController {
153    #[cfg(test)]
154    pub(crate) fn new(
155        started_ts: u64,
156        started_tip_ts: u64,
157        last_updated_ts: u64,
158        last_updated_tip_ts: u64,
159        is_close_to_the_end: bool,
160    ) -> Self {
161        Self {
162            started_ts,
163            started_tip_ts,
164            last_updated_ts,
165            last_updated_tip_ts,
166            is_close_to_the_end,
167        }
168    }
169
170    pub(crate) fn from_header(better_tip_header: &HeaderIndexView) -> Self {
171        let started_ts = unix_time_as_millis();
172        let started_tip_ts = better_tip_header.timestamp();
173        Self {
174            started_ts,
175            started_tip_ts,
176            last_updated_ts: started_ts,
177            last_updated_tip_ts: started_tip_ts,
178            is_close_to_the_end: false,
179        }
180    }
181
182    // https://github.com/rust-lang/rust-clippy/pull/8738
183    // wrong_self_convention allows is_* to take &mut self
184    #[allow(clippy::wrong_self_convention)]
185    pub(crate) fn is_timeout(&mut self, now_tip_ts: u64, now: u64) -> Option<bool> {
186        let inspect_window = HEADERS_DOWNLOAD_INSPECT_WINDOW;
187        let expected_headers_per_sec = HEADERS_DOWNLOAD_HEADERS_PER_SECOND;
188        let tolerable_bias = HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE;
189
190        let expected_before_finished = now.saturating_sub(now_tip_ts);
191
192        trace!("headers-sync: better tip ts {}; now {}", now_tip_ts, now);
193
194        if self.is_close_to_the_end {
195            let expected_in_base_time =
196                expected_headers_per_sec * inspect_window * POW_INTERVAL / 1000;
197            if expected_before_finished > expected_in_base_time {
198                self.started_ts = now;
199                self.started_tip_ts = now_tip_ts;
200                self.last_updated_ts = now;
201                self.last_updated_tip_ts = now_tip_ts;
202                self.is_close_to_the_end = false;
203                // if the node is behind the estimated tip header too much, sync again;
204                trace!(
205                    "headers-sync: send GetHeaders again since we are significantly behind the tip"
206                );
207                None
208            } else {
209                // ignore timeout because the tip already almost reach the real time;
210                // we can sync to the estimated tip in 1 inspect window by the slowest speed that we can accept.
211                Some(false)
212            }
213        } else if expected_before_finished < inspect_window {
214            self.is_close_to_the_end = true;
215            trace!("headers-sync: ignore timeout because the tip almost reaches the real time");
216            Some(false)
217        } else {
218            let spent_since_last_updated = now.saturating_sub(self.last_updated_ts);
219
220            if spent_since_last_updated < inspect_window {
221                // ignore timeout because the time spent since last updated is not enough as a sample
222                Some(false)
223            } else {
224                let synced_since_last_updated = now_tip_ts.saturating_sub(self.last_updated_tip_ts);
225                let expected_since_last_updated =
226                    expected_headers_per_sec * spent_since_last_updated * POW_INTERVAL / 1000;
227
228                if synced_since_last_updated < expected_since_last_updated / tolerable_bias {
229                    // if instantaneous speed is too slow, we don't care the global average speed
230                    trace!("headers-sync: the instantaneous speed is too slow");
231                    Some(true)
232                } else {
233                    self.last_updated_ts = now;
234                    self.last_updated_tip_ts = now_tip_ts;
235
236                    if synced_since_last_updated > expected_since_last_updated {
237                        trace!("headers-sync: the instantaneous speed is acceptable");
238                        Some(false)
239                    } else {
240                        // tolerate more bias for instantaneous speed, we will check the global average speed
241                        let spent_since_started = now.saturating_sub(self.started_ts);
242                        let synced_since_started = now_tip_ts.saturating_sub(self.started_tip_ts);
243
244                        let expected_since_started =
245                            expected_headers_per_sec * spent_since_started * POW_INTERVAL / 1000;
246
247                        if synced_since_started < expected_since_started {
248                            // the global average speed is too slow
249                            trace!(
250                                "headers-sync: both the global average speed and the instantaneous speed \
251                                are slower than expected"
252                            );
253                            Some(true)
254                        } else {
255                            trace!("headers-sync: the global average speed is acceptable");
256                            Some(false)
257                        }
258                    }
259                }
260            }
261        }
262    }
263}
264
265#[derive(Clone, Default, Debug)]
266pub struct PeerState {
267    pub headers_sync_controller: Option<HeadersSyncController>,
268    pub peer_flags: PeerFlags,
269    pub chain_sync: ChainSyncState,
270    // The best known block we know this peer has announced
271    pub best_known_header: Option<HeaderIndex>,
272    // The last block we both stored
273    pub last_common_header: Option<BlockNumberAndHash>,
274    // use on ibd concurrent block download
275    // save `get_headers` locator hashes here
276    pub unknown_header_list: Vec<Byte32>,
277}
278
279impl PeerState {
280    pub fn new(peer_flags: PeerFlags) -> PeerState {
281        PeerState {
282            headers_sync_controller: None,
283            peer_flags,
284            chain_sync: ChainSyncState::default(),
285            best_known_header: None,
286            last_common_header: None,
287            unknown_header_list: Vec::new(),
288        }
289    }
290
291    pub fn can_start_sync(&self, now: u64, ibd: bool) -> bool {
292        // only sync with protect/whitelist peer in IBD
293        ((self.peer_flags.is_protect || self.peer_flags.is_whitelist) || !ibd)
294            && self.chain_sync.can_start_sync(now)
295    }
296
297    pub fn start_sync(&mut self, headers_sync_controller: HeadersSyncController) {
298        self.chain_sync.start();
299        self.headers_sync_controller = Some(headers_sync_controller);
300    }
301
302    fn suspend_sync(&mut self, suspend_time: u64) {
303        let now = unix_time_as_millis();
304        self.chain_sync.suspend(now + suspend_time);
305        self.headers_sync_controller = None;
306    }
307
308    fn tip_synced(&mut self) {
309        self.chain_sync.tip_synced();
310        self.headers_sync_controller = None;
311    }
312
313    pub(crate) fn sync_started(&self) -> bool {
314        self.chain_sync.started()
315    }
316
317    pub(crate) fn started_or_tip_synced(&self) -> bool {
318        self.chain_sync.started_or_tip_synced()
319    }
320
321    pub(crate) fn sync_connected(&mut self) {
322        self.chain_sync.connected()
323    }
324}
325
326pub struct TtlFilter<T> {
327    inner: LruCache<T, u64>,
328    ttl: u64,
329}
330
331impl<T: Eq + Hash + Clone> Default for TtlFilter<T> {
332    fn default() -> Self {
333        TtlFilter::new(FILTER_SIZE, FILTER_TTL)
334    }
335}
336
337impl<T: Eq + Hash + Clone> TtlFilter<T> {
338    pub fn new(size: usize, ttl: u64) -> Self {
339        Self {
340            inner: LruCache::new(size),
341            ttl,
342        }
343    }
344
345    pub fn contains(&self, item: &T) -> bool {
346        self.inner.contains(item)
347    }
348
349    pub fn insert(&mut self, item: T) -> bool {
350        let now = ckb_systemtime::unix_time().as_secs();
351        self.inner.put(item, now).is_none()
352    }
353
354    pub fn remove(&mut self, item: &T) -> bool {
355        self.inner.pop(item).is_some()
356    }
357
358    /// Removes expired items.
359    pub fn remove_expired(&mut self) {
360        let now = ckb_systemtime::unix_time().as_secs();
361        let expired_keys: Vec<T> = self
362            .inner
363            .iter()
364            .filter_map(|(key, time)| {
365                if *time + self.ttl < now {
366                    Some(key)
367                } else {
368                    None
369                }
370            })
371            .cloned()
372            .collect();
373
374        for k in expired_keys {
375            self.remove(&k);
376        }
377    }
378}
379
380#[derive(Default)]
381pub struct Peers {
382    pub state: DashMap<PeerIndex, PeerState>,
383    pub n_sync_started: AtomicUsize,
384    pub n_protected_outbound_peers: AtomicUsize,
385}
386
387#[derive(Debug, Clone)]
388pub struct InflightState {
389    pub(crate) peer: PeerIndex,
390    pub(crate) timestamp: u64,
391}
392
393impl InflightState {
394    fn new(peer: PeerIndex) -> Self {
395        Self {
396            peer,
397            timestamp: unix_time_as_millis(),
398        }
399    }
400}
401
402enum TimeQuantile {
403    MinToFast,
404    FastToNormal,
405    NormalToUpper,
406    UpperToMax,
407}
408
409/// Using 512 blocks as a period, dynamically adjust the scheduler's time standard
410/// Divided into three time periods, including:
411///
412/// | fast | normal | penalty | double penalty |
413///
414/// The dividing line is, 1/3 position, 4/5 position, 1/10 position.
415///
416/// There is 14/30 normal area, 1/10 penalty area, 1/10 double penalty area, 1/3 accelerated reward area.
417///
418/// Most of the nodes that fall in the normal and accelerated reward area will be retained,
419/// while most of the nodes that fall in the normal and penalty zones will be slowly eliminated
420///
421/// The purpose of dynamic tuning is to reduce the consumption problem of sync networks
422/// by retaining the vast majority of nodes with stable communications and
423/// cleaning up nodes with significantly lower response times than a certain level
424#[derive(Clone)]
425struct TimeAnalyzer {
426    trace: [u64; TIME_TRACE_SIZE],
427    index: usize,
428    fast_time: u64,
429    normal_time: u64,
430    low_time: u64,
431}
432
433impl fmt::Debug for TimeAnalyzer {
434    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
435        fmt.debug_struct("TimeAnalyzer")
436            .field("fast_time", &self.fast_time)
437            .field("normal_time", &self.normal_time)
438            .field("low_time", &self.low_time)
439            .finish()
440    }
441}
442
443impl Default for TimeAnalyzer {
444    fn default() -> Self {
445        // Block max size about 700k, Under 10m/s bandwidth it may cost 1s to response
446        Self {
447            trace: [0; TIME_TRACE_SIZE],
448            index: 0,
449            fast_time: 1000,
450            normal_time: 1250,
451            low_time: 1500,
452        }
453    }
454}
455
456impl TimeAnalyzer {
457    fn push_time(&mut self, time: u64) -> TimeQuantile {
458        if self.index < TIME_TRACE_SIZE {
459            self.trace[self.index] = time;
460            self.index += 1;
461        } else {
462            self.trace.sort_unstable();
463            self.fast_time = (self.fast_time.saturating_add(self.trace[FAST_INDEX])) >> 1;
464            self.normal_time = (self.normal_time.saturating_add(self.trace[NORMAL_INDEX])) >> 1;
465            self.low_time = (self.low_time.saturating_add(self.trace[LOW_INDEX])) >> 1;
466            self.trace[0] = time;
467            self.index = 1;
468        }
469
470        if time <= self.fast_time {
471            TimeQuantile::MinToFast
472        } else if time <= self.normal_time {
473            TimeQuantile::FastToNormal
474        } else if time > self.low_time {
475            TimeQuantile::UpperToMax
476        } else {
477            TimeQuantile::NormalToUpper
478        }
479    }
480}
481
482#[derive(Debug, Clone)]
483pub struct DownloadScheduler {
484    task_count: usize,
485    timeout_count: usize,
486    hashes: HashSet<BlockNumberAndHash>,
487}
488
489impl Default for DownloadScheduler {
490    fn default() -> Self {
491        Self {
492            hashes: HashSet::default(),
493            task_count: INIT_BLOCKS_IN_TRANSIT_PER_PEER,
494            timeout_count: 0,
495        }
496    }
497}
498
499impl DownloadScheduler {
500    fn inflight_count(&self) -> usize {
501        self.hashes.len()
502    }
503
504    fn can_fetch(&self) -> usize {
505        self.task_count.saturating_sub(self.hashes.len())
506    }
507
508    pub(crate) const fn task_count(&self) -> usize {
509        self.task_count
510    }
511
512    fn increase(&mut self, num: usize) {
513        if self.task_count < MAX_BLOCKS_IN_TRANSIT_PER_PEER {
514            self.task_count = ::std::cmp::min(
515                self.task_count.saturating_add(num),
516                MAX_BLOCKS_IN_TRANSIT_PER_PEER,
517            )
518        }
519    }
520
521    fn decrease(&mut self, num: usize) {
522        self.timeout_count = self.task_count.saturating_add(num);
523        if self.timeout_count > 2 {
524            self.task_count = self.task_count.saturating_sub(1);
525            self.timeout_count = 0;
526        }
527    }
528
529    fn punish(&mut self, exp: usize) {
530        self.task_count >>= exp
531    }
532}
533
534#[derive(Clone)]
535pub struct InflightBlocks {
536    pub(crate) download_schedulers: HashMap<PeerIndex, DownloadScheduler>,
537    inflight_states: BTreeMap<BlockNumberAndHash, InflightState>,
538    pub(crate) trace_number: HashMap<BlockNumberAndHash, u64>,
539    pub(crate) restart_number: BlockNumber,
540    time_analyzer: TimeAnalyzer,
541    pub(crate) adjustment: bool,
542    pub(crate) protect_num: usize,
543}
544
545impl Default for InflightBlocks {
546    fn default() -> Self {
547        InflightBlocks {
548            download_schedulers: HashMap::default(),
549            inflight_states: BTreeMap::default(),
550            trace_number: HashMap::default(),
551            restart_number: 0,
552            time_analyzer: TimeAnalyzer::default(),
553            adjustment: true,
554            protect_num: MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
555        }
556    }
557}
558
559struct DebugHashSet<'a>(&'a HashSet<BlockNumberAndHash>);
560
561impl<'a> fmt::Debug for DebugHashSet<'a> {
562    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
563        fmt.debug_set()
564            .entries(self.0.iter().map(|h| format!("{}, {}", h.number, h.hash)))
565            .finish()
566    }
567}
568
569impl fmt::Debug for InflightBlocks {
570    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
571        fmt.debug_map()
572            .entries(
573                self.download_schedulers
574                    .iter()
575                    .map(|(k, v)| (k, DebugHashSet(&v.hashes))),
576            )
577            .finish()?;
578        fmt.debug_map()
579            .entries(
580                self.inflight_states
581                    .iter()
582                    .map(|(k, v)| (format!("{}, {}", k.number, k.hash), v)),
583            )
584            .finish()?;
585        self.time_analyzer.fmt(fmt)
586    }
587}
588
589impl InflightBlocks {
590    pub fn blocks_iter(&self) -> impl Iterator<Item = (&PeerIndex, &HashSet<BlockNumberAndHash>)> {
591        self.download_schedulers.iter().map(|(k, v)| (k, &v.hashes))
592    }
593
594    pub fn total_inflight_count(&self) -> usize {
595        self.inflight_states.len()
596    }
597
598    pub fn division_point(&self) -> (u64, u64, u64) {
599        (
600            self.time_analyzer.fast_time,
601            self.time_analyzer.normal_time,
602            self.time_analyzer.low_time,
603        )
604    }
605
606    pub fn peer_inflight_count(&self, peer: PeerIndex) -> usize {
607        self.download_schedulers
608            .get(&peer)
609            .map(DownloadScheduler::inflight_count)
610            .unwrap_or(0)
611    }
612
613    pub fn peer_can_fetch_count(&self, peer: PeerIndex) -> usize {
614        self.download_schedulers.get(&peer).map_or(
615            INIT_BLOCKS_IN_TRANSIT_PER_PEER,
616            DownloadScheduler::can_fetch,
617        )
618    }
619
620    pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet<BlockNumberAndHash>> {
621        self.download_schedulers.get(&peer).map(|d| &d.hashes)
622    }
623
624    pub fn inflight_state_by_block(&self, block: &BlockNumberAndHash) -> Option<&InflightState> {
625        self.inflight_states.get(block)
626    }
627
628    pub fn mark_slow_block(&mut self, tip: BlockNumber) {
629        let now = ckb_systemtime::unix_time_as_millis();
630        for key in self.inflight_states.keys() {
631            if key.number > tip + 1 {
632                break;
633            }
634            self.trace_number.entry(key.clone()).or_insert(now);
635        }
636    }
637
638    pub fn prune(&mut self, tip: BlockNumber) -> HashSet<PeerIndex> {
639        let now = unix_time_as_millis();
640        let mut disconnect_list = HashSet::new();
641        // Since statistics are currently disturbed by the processing block time, when the number
642        // of transactions increases, the node will be accidentally evicted.
643        //
644        // Especially on machines with poor CPU performance, the node connection will be frequently
645        // disconnected due to statistics.
646        //
647        // In order to protect the decentralization of the network and ensure the survival of low-performance
648        // nodes, the penalty mechanism will be closed when the number of download nodes is less than the number of protected nodes
649        let should_punish = self.download_schedulers.len() > self.protect_num;
650        let adjustment = self.adjustment;
651
652        let trace = &mut self.trace_number;
653        let download_schedulers = &mut self.download_schedulers;
654        let states = &mut self.inflight_states;
655
656        let mut remove_key = Vec::new();
657        // Since this is a btreemap, with the data already sorted,
658        // we don't have to worry about missing points, and we don't need to
659        // iterate through all the data each time, just check within tip + 20,
660        // with the checkpoint marking possible blocking points, it's enough
661        let end = tip + 20;
662        for (key, value) in states.iter() {
663            if key.number > end {
664                break;
665            }
666            if value.timestamp + BLOCK_DOWNLOAD_TIMEOUT < now {
667                if let Some(set) = download_schedulers.get_mut(&value.peer) {
668                    set.hashes.remove(key);
669                    if should_punish && adjustment {
670                        set.punish(2);
671                    }
672                };
673                if !trace.is_empty() {
674                    trace.remove(key);
675                }
676                remove_key.push(key.clone());
677                debug!(
678                    "prune: remove InflightState: remove {}-{} from {}",
679                    key.number, key.hash, value.peer
680                );
681
682                if let Some(metrics) = ckb_metrics::handle() {
683                    metrics.ckb_inflight_timeout_count.inc();
684                }
685            }
686        }
687
688        for key in remove_key {
689            states.remove(&key);
690        }
691
692        download_schedulers.retain(|k, v| {
693            // task number zero means this peer's response is very slow
694            if v.task_count == 0 {
695                disconnect_list.insert(*k);
696                false
697            } else {
698                true
699            }
700        });
701        shrink_to_fit!(download_schedulers, SHRINK_THRESHOLD);
702
703        if self.restart_number != 0 && tip + 1 > self.restart_number {
704            self.restart_number = 0;
705        }
706
707        // Since each environment is different, the policy here must also be dynamically adjusted
708        // according to the current environment, and a low-level limit is given here, since frequent
709        // restarting of a task consumes more than a low-level limit
710        let timeout_limit = self.time_analyzer.low_time;
711
712        let restart_number = &mut self.restart_number;
713        trace.retain(|key, time| {
714            // In the normal state, trace will always empty
715            //
716            // When the inflight request reaches the checkpoint(inflight > tip + 512),
717            // it means that there is an anomaly in the sync less than tip + 1, i.e. some nodes are stuck,
718            // at which point it will be recorded as the timestamp at that time.
719            //
720            // If the time exceeds low time limit, delete the task and halve the number of
721            // executable tasks for the corresponding node
722            if now > timeout_limit + *time {
723                if let Some(state) = states.remove(key) {
724                    if let Some(d) = download_schedulers.get_mut(&state.peer) {
725                        if should_punish && adjustment {
726                            d.punish(1);
727                        }
728                        d.hashes.remove(key);
729                        debug!(
730                            "prune: remove download_schedulers: remove {}-{} from {}",
731                            key.number, key.hash, state.peer
732                        );
733                    };
734                }
735
736                if key.number > *restart_number {
737                    *restart_number = key.number;
738                }
739                return false;
740            }
741            true
742        });
743        shrink_to_fit!(trace, SHRINK_THRESHOLD);
744
745        disconnect_list
746    }
747
748    pub fn insert(&mut self, peer: PeerIndex, block: BlockNumberAndHash) -> bool {
749        let state = self.inflight_states.entry(block.clone());
750        match state {
751            Entry::Occupied(_entry) => return false,
752            Entry::Vacant(entry) => entry.insert(InflightState::new(peer)),
753        };
754
755        if self.restart_number >= block.number {
756            // All new requests smaller than restart_number mean that they are cleaned up and
757            // cannot be immediately marked as cleaned up again.
758            self.trace_number
759                .insert(block.clone(), unix_time_as_millis());
760        }
761
762        let download_scheduler = self.download_schedulers.entry(peer).or_default();
763        download_scheduler.hashes.insert(block)
764    }
765
766    pub fn remove_by_peer(&mut self, peer: PeerIndex) -> usize {
767        let trace = &mut self.trace_number;
768        let state = &mut self.inflight_states;
769
770        self.download_schedulers
771            .remove(&peer)
772            .map(|blocks| {
773                let blocks_count = blocks.hashes.iter().len();
774                for block in blocks.hashes {
775                    state.remove(&block);
776                    if !trace.is_empty() {
777                        trace.remove(&block);
778                    }
779                }
780                blocks_count
781            })
782            .unwrap_or_default()
783    }
784
785    pub fn remove_by_block(&mut self, block: BlockNumberAndHash) -> bool {
786        let should_punish = self.download_schedulers.len() > self.protect_num;
787        let download_schedulers = &mut self.download_schedulers;
788        let trace = &mut self.trace_number;
789        let time_analyzer = &mut self.time_analyzer;
790        let adjustment = self.adjustment;
791        self.inflight_states
792            .remove(&block)
793            .map(|state| {
794                let elapsed = unix_time_as_millis().saturating_sub(state.timestamp);
795                if let Some(set) = download_schedulers.get_mut(&state.peer) {
796                    set.hashes.remove(&block);
797                    if adjustment {
798                        match time_analyzer.push_time(elapsed) {
799                            TimeQuantile::MinToFast => set.increase(2),
800                            TimeQuantile::FastToNormal => set.increase(1),
801                            TimeQuantile::NormalToUpper => {
802                                if should_punish {
803                                    set.decrease(1)
804                                }
805                            }
806                            TimeQuantile::UpperToMax => {
807                                if should_punish {
808                                    set.decrease(2)
809                                }
810                            }
811                        }
812                    }
813                    if !trace.is_empty() {
814                        trace.remove(&block);
815                    }
816                };
817            })
818            .is_some()
819    }
820}
821
822impl Peers {
823    pub fn sync_connected(
824        &self,
825        peer: PeerIndex,
826        is_outbound: bool,
827        is_whitelist: bool,
828        is_2023edition: bool,
829    ) {
830        let protect_outbound = is_outbound
831            && self
832                .n_protected_outbound_peers
833                .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
834                    if x < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT {
835                        Some(x + 1)
836                    } else {
837                        None
838                    }
839                })
840                .is_ok();
841
842        let peer_flags = PeerFlags {
843            is_outbound,
844            is_whitelist,
845            is_2023edition,
846            is_protect: protect_outbound,
847        };
848        self.state
849            .entry(peer)
850            .and_modify(|state| {
851                state.peer_flags = peer_flags;
852                state.sync_connected();
853            })
854            .or_insert_with(|| {
855                let mut state = PeerState::new(peer_flags);
856                state.sync_connected();
857                state
858            });
859    }
860
861    pub fn relay_connected(&self, peer: PeerIndex) {
862        self.state
863            .entry(peer)
864            .or_insert_with(|| PeerState::new(PeerFlags::default()));
865    }
866
867    pub fn get_best_known_header(&self, pi: PeerIndex) -> Option<HeaderIndex> {
868        self.state
869            .get(&pi)
870            .and_then(|peer_state| peer_state.best_known_header.clone())
871    }
872
873    pub fn may_set_best_known_header(&self, peer: PeerIndex, header_index: HeaderIndex) {
874        if let Some(mut peer_state) = self.state.get_mut(&peer) {
875            if let Some(ref known) = peer_state.best_known_header {
876                if header_index.is_better_chain(known) {
877                    peer_state.best_known_header = Some(header_index);
878                }
879            } else {
880                peer_state.best_known_header = Some(header_index);
881            }
882        }
883    }
884
885    pub fn get_last_common_header(&self, pi: PeerIndex) -> Option<BlockNumberAndHash> {
886        self.state
887            .get(&pi)
888            .and_then(|peer_state| peer_state.last_common_header.clone())
889    }
890
891    pub fn set_last_common_header(&self, pi: PeerIndex, header: BlockNumberAndHash) {
892        self.state
893            .entry(pi)
894            .and_modify(|peer_state| peer_state.last_common_header = Some(header));
895    }
896
897    pub fn getheaders_received(&self, _peer: PeerIndex) {
898        // TODO:
899    }
900
901    pub fn disconnected(&self, peer: PeerIndex) {
902        if let Some(peer_state) = self.state.remove(&peer).map(|(_, peer_state)| peer_state) {
903            if peer_state.sync_started() {
904                // It shouldn't happen
905                // fetch_sub wraps around on overflow, we still check manually
906                // panic here to prevent some bug be hidden silently.
907                assert_ne!(
908                    self.n_sync_started.fetch_sub(1, Ordering::AcqRel),
909                    0,
910                    "n_sync_started overflow when disconnects"
911                );
912            }
913
914            // Protection node disconnected
915            if peer_state.peer_flags.is_protect {
916                assert_ne!(
917                    self.n_protected_outbound_peers
918                        .fetch_sub(1, Ordering::AcqRel),
919                    0,
920                    "n_protected_outbound_peers overflow when disconnects"
921                );
922            }
923        }
924    }
925
926    pub fn insert_unknown_header_hash(&self, peer: PeerIndex, hash: Byte32) {
927        self.state
928            .entry(peer)
929            .and_modify(|state| state.unknown_header_list.push(hash));
930    }
931
932    pub fn unknown_header_list_is_empty(&self, peer: PeerIndex) -> bool {
933        self.state
934            .get(&peer)
935            .map(|state| state.unknown_header_list.is_empty())
936            .unwrap_or(true)
937    }
938
939    pub fn clear_unknown_list(&self) {
940        self.state.iter_mut().for_each(|mut state| {
941            if !state.unknown_header_list.is_empty() {
942                state.unknown_header_list.clear()
943            }
944        })
945    }
946
947    pub fn get_best_known_less_than_tip_and_unknown_empty(
948        &self,
949        tip: BlockNumber,
950    ) -> Vec<PeerIndex> {
951        self.state
952            .iter()
953            .filter_map(|kv_pair| {
954                let (peer_index, state) = kv_pair.pair();
955                if !state.unknown_header_list.is_empty() {
956                    return None;
957                }
958                if let Some(ref header) = state.best_known_header {
959                    if header.number() < tip {
960                        return Some(*peer_index);
961                    }
962                }
963                None
964            })
965            .collect()
966    }
967
968    pub fn take_unknown_last(&self, peer: PeerIndex) -> Option<Byte32> {
969        self.state
970            .get_mut(&peer)
971            .and_then(|mut state| state.unknown_header_list.pop())
972    }
973
974    pub fn get_flag(&self, peer: PeerIndex) -> Option<PeerFlags> {
975        self.state.get(&peer).map(|state| state.peer_flags)
976    }
977}
978
979// <CompactBlockHash, (CompactBlock, <PeerIndex, (Vec<TransactionsIndex>, Vec<UnclesIndex>)>, timestamp)>
980pub(crate) type PendingCompactBlockMap = HashMap<
981    Byte32,
982    (
983        packed::CompactBlock,
984        HashMap<PeerIndex, (Vec<u32>, Vec<u32>)>,
985        u64,
986    ),
987>;
988
989/// Sync state shared between sync and relayer protocol
990#[derive(Clone)]
991pub struct SyncShared {
992    shared: Shared,
993    state: Arc<SyncState>,
994}
995
996impl SyncShared {
997    /// Create a SyncShared
998    pub fn new(
999        shared: Shared,
1000        sync_config: SyncConfig,
1001        tx_relay_receiver: Receiver<TxVerificationResult>,
1002    ) -> SyncShared {
1003        let (total_difficulty, header) = {
1004            let snapshot = shared.snapshot();
1005            (
1006                snapshot.total_difficulty().to_owned(),
1007                snapshot.tip_header().to_owned(),
1008            )
1009        };
1010        let shared_best_header = RwLock::new((header, total_difficulty).into());
1011        info!(
1012            "header_map.memory_limit {}",
1013            sync_config.header_map.memory_limit
1014        );
1015
1016        let state = SyncState {
1017            shared_best_header,
1018            tx_filter: Mutex::new(TtlFilter::default()),
1019            unknown_tx_hashes: Mutex::new(KeyedPriorityQueue::new()),
1020            peers: Peers::default(),
1021            pending_get_block_proposals: DashMap::new(),
1022            pending_compact_blocks: tokio::sync::Mutex::new(HashMap::default()),
1023            inflight_proposals: DashMap::new(),
1024            inflight_blocks: RwLock::new(InflightBlocks::default()),
1025            pending_get_headers: RwLock::new(LruCache::new(GET_HEADERS_CACHE_SIZE)),
1026            tx_relay_receiver,
1027            min_chain_work: sync_config.min_chain_work,
1028        };
1029
1030        SyncShared {
1031            shared,
1032            state: Arc::new(state),
1033        }
1034    }
1035
1036    /// Shared chain db/config
1037    pub fn shared(&self) -> &Shared {
1038        &self.shared
1039    }
1040
1041    /// Get snapshot with current chain
1042    pub fn active_chain(&self) -> ActiveChain {
1043        ActiveChain {
1044            sync_shared: self.clone(),
1045            snapshot: Arc::clone(&self.shared.snapshot()),
1046        }
1047    }
1048
1049    /// Get chain store
1050    pub fn store(&self) -> &ChainDB {
1051        self.shared.store()
1052    }
1053
1054    /// Get sync state
1055    pub fn state(&self) -> &SyncState {
1056        &self.state
1057    }
1058
1059    /// Get consensus config
1060    pub fn consensus(&self) -> &Consensus {
1061        self.shared.consensus()
1062    }
1063
1064    // Only used by unit test
1065    // Blocking insert a new block, return the verify result
1066    #[cfg(test)]
1067    pub(crate) fn blocking_insert_new_block(
1068        &self,
1069        chain: &ChainController,
1070        block: Arc<core::BlockView>,
1071    ) -> VerifyResult {
1072        chain.blocking_process_block(block)
1073    }
1074
1075    pub(crate) fn accept_remote_block(&self, chain: &ChainController, remote_block: RemoteBlock) {
1076        {
1077            let entry = self
1078                .shared()
1079                .block_status_map()
1080                .entry(remote_block.block.header().hash());
1081            if let dashmap::mapref::entry::Entry::Vacant(entry) = entry {
1082                entry.insert(BlockStatus::BLOCK_RECEIVED);
1083            }
1084        }
1085
1086        chain.asynchronous_process_remote_block(remote_block)
1087    }
1088
1089    /// Sync a new valid header, try insert to sync state
1090    // Update the header_map
1091    // Update the block_status_map
1092    // Update the shared_best_header if need
1093    // Update the peer's best_known_header
1094    pub fn insert_valid_header(&self, peer: PeerIndex, header: &core::HeaderView) {
1095        let tip_number = self.active_chain().tip_number();
1096        let store_first = tip_number >= header.number();
1097        // We don't use header#parent_hash clone here because it will hold the arc counter of the SendHeaders message
1098        // which will cause the 2000 headers to be held in memory for a long time
1099        let parent_hash = Byte32::from_slice(header.data().raw().parent_hash().as_slice())
1100            .expect("checked slice length");
1101        let parent_header_index = self
1102            .get_header_index_view(&parent_hash, store_first)
1103            .expect("parent should be verified");
1104        let mut header_view = HeaderIndexView::new(
1105            header.hash(),
1106            header.number(),
1107            header.epoch(),
1108            header.timestamp(),
1109            parent_hash,
1110            parent_header_index.total_difficulty() + header.difficulty(),
1111        );
1112
1113        let snapshot = Arc::clone(&self.shared.snapshot());
1114        header_view.build_skip(
1115            tip_number,
1116            |hash, store_first| self.get_header_index_view(hash, store_first),
1117            |number, current| {
1118                // shortcut to return an ancestor block
1119                if current.number <= snapshot.tip_number() && snapshot.is_main_chain(&current.hash)
1120                {
1121                    snapshot
1122                        .get_block_hash(number)
1123                        .and_then(|hash| self.get_header_index_view(&hash, true))
1124                } else {
1125                    None
1126                }
1127            },
1128        );
1129        self.shared.header_map().insert(header_view.clone());
1130        self.state
1131            .peers()
1132            .may_set_best_known_header(peer, header_view.as_header_index());
1133        self.state.may_set_shared_best_header(header_view);
1134    }
1135
1136    pub(crate) fn get_header_index_view(
1137        &self,
1138        hash: &Byte32,
1139        store_first: bool,
1140    ) -> Option<HeaderIndexView> {
1141        let store = self.store();
1142        if store_first {
1143            store
1144                .get_block_header(hash)
1145                .and_then(|header| {
1146                    store
1147                        .get_block_ext(hash)
1148                        .map(|block_ext| (header, block_ext.total_difficulty).into())
1149                })
1150                .or_else(|| self.shared.header_map().get(hash))
1151        } else {
1152            self.shared.header_map().get(hash).or_else(|| {
1153                store.get_block_header(hash).and_then(|header| {
1154                    store
1155                        .get_block_ext(hash)
1156                        .map(|block_ext| (header, block_ext.total_difficulty).into())
1157                })
1158            })
1159        }
1160    }
1161
1162    /// Check whether block has been inserted to chain store
1163    pub fn is_stored(&self, hash: &packed::Byte32) -> bool {
1164        let status = self.active_chain().get_block_status(hash);
1165        status.contains(BlockStatus::BLOCK_STORED)
1166    }
1167
1168    /// Get epoch ext by block hash
1169    pub fn get_epoch_ext(&self, hash: &Byte32) -> Option<EpochExt> {
1170        self.store().get_block_epoch(hash)
1171    }
1172
1173    /// Insert peer's unknown_header_list
1174    pub fn insert_peer_unknown_header_list(&self, pi: PeerIndex, header_list: Vec<Byte32>) {
1175        // update peer's unknown_header_list only once
1176        if self.state().peers.unknown_header_list_is_empty(pi) {
1177            // header list is an ordered list, sorted from highest to lowest,
1178            // so here you discard and exit early
1179            for hash in header_list {
1180                if let Some(header) = self.shared().header_map().get(&hash) {
1181                    self.state()
1182                        .peers
1183                        .may_set_best_known_header(pi, header.as_header_index());
1184                    break;
1185                } else {
1186                    self.state().peers.insert_unknown_header_hash(pi, hash)
1187                }
1188            }
1189        }
1190    }
1191
1192    /// Return true when the block is that we have requested and received first time.
1193    pub fn new_block_received(&self, block: &core::BlockView) -> bool {
1194        if !self
1195            .state()
1196            .write_inflight_blocks()
1197            .remove_by_block((block.number(), block.hash()).into())
1198        {
1199            return false;
1200        }
1201
1202        let status = self.active_chain().get_block_status(&block.hash());
1203        debug!(
1204            "new_block_received {}-{}, status: {:?}",
1205            block.number(),
1206            block.hash(),
1207            status
1208        );
1209        if !BlockStatus::HEADER_VALID.eq(&status) {
1210            return false;
1211        }
1212
1213        if let dashmap::mapref::entry::Entry::Vacant(status) =
1214            self.shared().block_status_map().entry(block.hash())
1215        {
1216            status.insert(BlockStatus::BLOCK_RECEIVED);
1217            return true;
1218        }
1219        false
1220    }
1221}
1222
1223impl HeaderFieldsProvider for SyncShared {
1224    fn get_header_fields(&self, hash: &Byte32) -> Option<HeaderFields> {
1225        self.shared
1226            .header_map()
1227            .get(hash)
1228            .map(|header| HeaderFields {
1229                hash: header.hash(),
1230                number: header.number(),
1231                epoch: header.epoch(),
1232                timestamp: header.timestamp(),
1233                parent_hash: header.parent_hash(),
1234            })
1235            .or_else(|| {
1236                self.store()
1237                    .get_block_header(hash)
1238                    .map(|header| HeaderFields {
1239                        hash: header.hash(),
1240                        number: header.number(),
1241                        epoch: header.epoch(),
1242                        timestamp: header.timestamp(),
1243                        parent_hash: header.parent_hash(),
1244                    })
1245            })
1246    }
1247}
1248
1249#[derive(Eq, PartialEq, Clone)]
1250pub struct UnknownTxHashPriority {
1251    request_time: Instant,
1252    peers: Vec<PeerIndex>,
1253    requested: bool,
1254}
1255
1256impl UnknownTxHashPriority {
1257    pub fn should_request(&self, now: Instant) -> bool {
1258        self.next_request_at() < now
1259    }
1260
1261    pub fn next_request_at(&self) -> Instant {
1262        if self.requested {
1263            self.request_time + RETRY_ASK_TX_TIMEOUT_INCREASE
1264        } else {
1265            self.request_time
1266        }
1267    }
1268
1269    pub fn next_request_peer(&mut self) -> Option<PeerIndex> {
1270        if self.requested {
1271            if self.peers.len() > 1 {
1272                self.request_time = Instant::now();
1273                self.peers.swap_remove(0);
1274                self.peers.first().cloned()
1275            } else {
1276                None
1277            }
1278        } else {
1279            self.requested = true;
1280            self.peers.first().cloned()
1281        }
1282    }
1283
1284    pub fn push_peer(&mut self, peer_index: PeerIndex) {
1285        self.peers.push(peer_index);
1286    }
1287
1288    pub fn requesting_peer(&self) -> Option<PeerIndex> {
1289        if self.requested {
1290            self.peers.first().cloned()
1291        } else {
1292            None
1293        }
1294    }
1295}
1296
1297impl Ord for UnknownTxHashPriority {
1298    fn cmp(&self, other: &Self) -> cmp::Ordering {
1299        self.next_request_at()
1300            .cmp(&other.next_request_at())
1301            .reverse()
1302    }
1303}
1304
1305impl PartialOrd for UnknownTxHashPriority {
1306    fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1307        Some(self.cmp(other))
1308    }
1309}
1310
1311pub struct SyncState {
1312    /* Status irrelevant to peers */
1313    shared_best_header: RwLock<HeaderIndexView>,
1314    tx_filter: Mutex<TtlFilter<Byte32>>,
1315
1316    // The priority is ordering by timestamp (reversed), means do not ask the tx before this timestamp (timeout).
1317    unknown_tx_hashes: Mutex<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>>,
1318
1319    /* Status relevant to peers */
1320    peers: Peers,
1321
1322    /* Cached items which we had received but not completely process */
1323    pending_get_block_proposals: DashMap<packed::ProposalShortId, HashSet<PeerIndex>>,
1324    pending_get_headers: RwLock<LruCache<(PeerIndex, Byte32), Instant>>,
1325    pending_compact_blocks: tokio::sync::Mutex<PendingCompactBlockMap>,
1326
1327    /* In-flight items for which we request to peers, but not got the responses yet */
1328    inflight_proposals: DashMap<packed::ProposalShortId, BlockNumber>,
1329    inflight_blocks: RwLock<InflightBlocks>,
1330
1331    /* cached for sending bulk */
1332    tx_relay_receiver: Receiver<TxVerificationResult>,
1333    min_chain_work: U256,
1334}
1335
1336impl SyncState {
1337    pub fn min_chain_work(&self) -> &U256 {
1338        &self.min_chain_work
1339    }
1340
1341    pub fn min_chain_work_ready(&self) -> bool {
1342        self.shared_best_header
1343            .read()
1344            .is_better_than(&self.min_chain_work)
1345    }
1346
1347    pub fn n_sync_started(&self) -> &AtomicUsize {
1348        &self.peers.n_sync_started
1349    }
1350
1351    pub fn peers(&self) -> &Peers {
1352        &self.peers
1353    }
1354
1355    pub fn compare_with_pending_compact(&self, hash: &Byte32, now: u64) -> bool {
1356        let pending = self.pending_compact_blocks.blocking_lock();
1357        // After compact block request 2s or pending is empty, sync can create tasks
1358        pending.is_empty()
1359            || pending
1360                .get(hash)
1361                .map(|(_, _, time)| now > time + 2000)
1362                .unwrap_or(true)
1363    }
1364
1365    pub async fn pending_compact_blocks(
1366        &self,
1367    ) -> tokio::sync::MutexGuard<'_, PendingCompactBlockMap> {
1368        self.pending_compact_blocks.lock().await
1369    }
1370
1371    pub fn read_inflight_blocks(&self) -> RwLockReadGuard<InflightBlocks> {
1372        self.inflight_blocks.read()
1373    }
1374
1375    pub fn write_inflight_blocks(&self) -> RwLockWriteGuard<InflightBlocks> {
1376        self.inflight_blocks.write()
1377    }
1378
1379    pub fn take_relay_tx_verify_results(&self, limit: usize) -> Vec<TxVerificationResult> {
1380        self.tx_relay_receiver.try_iter().take(limit).collect()
1381    }
1382
1383    pub fn shared_best_header(&self) -> HeaderIndexView {
1384        self.shared_best_header.read().to_owned()
1385    }
1386
1387    pub fn shared_best_header_ref(&self) -> RwLockReadGuard<HeaderIndexView> {
1388        self.shared_best_header.read()
1389    }
1390
1391    pub fn may_set_shared_best_header(&self, header: HeaderIndexView) {
1392        let mut shared_best_header = self.shared_best_header.write();
1393        if !header.is_better_than(shared_best_header.total_difficulty()) {
1394            return;
1395        }
1396
1397        if let Some(metrics) = ckb_metrics::handle() {
1398            metrics.ckb_shared_best_number.set(header.number() as i64);
1399        }
1400        *shared_best_header = header;
1401    }
1402
1403    pub(crate) fn suspend_sync(&self, peer_state: &mut PeerState) {
1404        if peer_state.sync_started() {
1405            assert_ne!(
1406                self.peers.n_sync_started.fetch_sub(1, Ordering::AcqRel),
1407                0,
1408                "n_sync_started overflow when suspend_sync"
1409            );
1410        }
1411        peer_state.suspend_sync(SUSPEND_SYNC_TIME);
1412    }
1413
1414    pub(crate) fn tip_synced(&self, peer_state: &mut PeerState) {
1415        if peer_state.sync_started() {
1416            assert_ne!(
1417                self.peers.n_sync_started.fetch_sub(1, Ordering::AcqRel),
1418                0,
1419                "n_sync_started overflow when tip_synced"
1420            );
1421        }
1422        peer_state.tip_synced();
1423    }
1424
1425    pub fn mark_as_known_tx(&self, hash: Byte32) {
1426        self.mark_as_known_txs(iter::once(hash));
1427    }
1428
1429    pub fn remove_from_known_txs(&self, hash: &Byte32) {
1430        self.tx_filter.lock().remove(hash);
1431    }
1432
1433    // maybe someday we can use
1434    // where T: Iterator<Item=Byte32>,
1435    // for<'a> &'a T: Iterator<Item=&'a Byte32>,
1436    pub fn mark_as_known_txs(&self, hashes: impl Iterator<Item = Byte32> + std::clone::Clone) {
1437        let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1438        let mut tx_filter = self.tx_filter.lock();
1439
1440        for hash in hashes {
1441            unknown_tx_hashes.remove(&hash);
1442            tx_filter.insert(hash);
1443        }
1444    }
1445
1446    pub fn pop_ask_for_txs(&self) -> HashMap<PeerIndex, Vec<Byte32>> {
1447        let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1448        let mut result: HashMap<PeerIndex, Vec<Byte32>> = HashMap::new();
1449        let now = Instant::now();
1450
1451        if !unknown_tx_hashes
1452            .peek()
1453            .map(|(_tx_hash, priority)| priority.should_request(now))
1454            .unwrap_or_default()
1455        {
1456            return result;
1457        }
1458
1459        while let Some((tx_hash, mut priority)) = unknown_tx_hashes.pop() {
1460            if priority.should_request(now) {
1461                if let Some(peer_index) = priority.next_request_peer() {
1462                    result
1463                        .entry(peer_index)
1464                        .and_modify(|hashes| hashes.push(tx_hash.clone()))
1465                        .or_insert_with(|| vec![tx_hash.clone()]);
1466                    unknown_tx_hashes.push(tx_hash, priority);
1467                }
1468            } else {
1469                unknown_tx_hashes.push(tx_hash, priority);
1470                break;
1471            }
1472        }
1473        result
1474    }
1475
1476    pub fn add_ask_for_txs(&self, peer_index: PeerIndex, tx_hashes: Vec<Byte32>) -> Status {
1477        let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1478
1479        for tx_hash in tx_hashes
1480            .into_iter()
1481            .take(MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER)
1482        {
1483            match unknown_tx_hashes.entry(tx_hash) {
1484                keyed_priority_queue::Entry::Occupied(entry) => {
1485                    let mut priority = entry.get_priority().clone();
1486                    priority.push_peer(peer_index);
1487                    entry.set_priority(priority);
1488                }
1489                keyed_priority_queue::Entry::Vacant(entry) => {
1490                    entry.set_priority(UnknownTxHashPriority {
1491                        request_time: Instant::now(),
1492                        peers: vec![peer_index],
1493                        requested: false,
1494                    })
1495                }
1496            }
1497        }
1498
1499        // Check `unknown_tx_hashes`'s length after inserting the arrival `tx_hashes`
1500        if unknown_tx_hashes.len() >= MAX_UNKNOWN_TX_HASHES_SIZE
1501            || unknown_tx_hashes.len()
1502                >= self.peers.state.len() * MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER
1503        {
1504            warn!(
1505                "unknown_tx_hashes is too long, len: {}",
1506                unknown_tx_hashes.len()
1507            );
1508
1509            let mut peer_unknown_counter = 0;
1510            for (_hash, priority) in unknown_tx_hashes.iter() {
1511                for peer in priority.peers.iter() {
1512                    if *peer == peer_index {
1513                        peer_unknown_counter += 1;
1514                    }
1515                }
1516            }
1517            if peer_unknown_counter >= MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER {
1518                return StatusCode::TooManyUnknownTransactions.into();
1519            }
1520
1521            return Status::ignored();
1522        }
1523
1524        Status::ok()
1525    }
1526
1527    pub fn already_known_tx(&self, hash: &Byte32) -> bool {
1528        self.tx_filter.lock().contains(hash)
1529    }
1530
1531    pub fn tx_filter(&self) -> MutexGuard<TtlFilter<Byte32>> {
1532        self.tx_filter.lock()
1533    }
1534
1535    pub fn unknown_tx_hashes(
1536        &self,
1537    ) -> MutexGuard<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>> {
1538        self.unknown_tx_hashes.lock()
1539    }
1540
1541    pub fn insert_inflight_proposals(
1542        &self,
1543        ids: Vec<packed::ProposalShortId>,
1544        block_number: BlockNumber,
1545    ) -> Vec<bool> {
1546        ids.into_iter()
1547            .map(|id| match self.inflight_proposals.entry(id) {
1548                dashmap::mapref::entry::Entry::Occupied(mut occupied) => {
1549                    if *occupied.get() < block_number {
1550                        occupied.insert(block_number);
1551                        true
1552                    } else {
1553                        false
1554                    }
1555                }
1556                dashmap::mapref::entry::Entry::Vacant(vacant) => {
1557                    vacant.insert(block_number);
1558                    true
1559                }
1560            })
1561            .collect()
1562    }
1563
1564    pub fn remove_inflight_proposals(&self, ids: &[packed::ProposalShortId]) -> Vec<bool> {
1565        ids.iter()
1566            .map(|id| self.inflight_proposals.remove(id).is_some())
1567            .collect()
1568    }
1569
1570    pub fn clear_expired_inflight_proposals(&self, keep_min_block_number: BlockNumber) {
1571        self.inflight_proposals
1572            .retain(|_, block_number| *block_number >= keep_min_block_number);
1573    }
1574
1575    pub fn contains_inflight_proposal(&self, proposal_id: &packed::ProposalShortId) -> bool {
1576        self.inflight_proposals.contains_key(proposal_id)
1577    }
1578
1579    pub fn drain_get_block_proposals(
1580        &self,
1581    ) -> DashMap<packed::ProposalShortId, HashSet<PeerIndex>> {
1582        let ret = self.pending_get_block_proposals.clone();
1583        self.pending_get_block_proposals.clear();
1584        ret
1585    }
1586
1587    pub fn insert_get_block_proposals(&self, pi: PeerIndex, ids: Vec<packed::ProposalShortId>) {
1588        for id in ids.into_iter() {
1589            self.pending_get_block_proposals
1590                .entry(id)
1591                .or_default()
1592                .insert(pi);
1593        }
1594    }
1595
1596    // Disconnect this peer and remove inflight blocks by peer
1597    //
1598    // TODO: record peer's connection duration (disconnect time - connect established time)
1599    // and report peer's connection duration to ckb_metrics
1600    pub fn disconnected(&self, pi: PeerIndex) {
1601        let removed_inflight_blocks_count = self.write_inflight_blocks().remove_by_peer(pi);
1602        if removed_inflight_blocks_count > 0 {
1603            debug!(
1604                "disconnected {}, remove {} inflight blocks",
1605                pi, removed_inflight_blocks_count
1606            )
1607        }
1608        self.peers().disconnected(pi);
1609    }
1610}
1611
1612/** ActiveChain captures a point-in-time view of indexed chain of blocks. */
1613#[derive(Clone)]
1614pub struct ActiveChain {
1615    sync_shared: SyncShared,
1616    snapshot: Arc<Snapshot>,
1617}
1618
1619#[doc(hidden)]
1620impl ActiveChain {
1621    pub(crate) fn sync_shared(&self) -> &SyncShared {
1622        &self.sync_shared
1623    }
1624
1625    pub fn shared(&self) -> &Shared {
1626        self.sync_shared.shared()
1627    }
1628
1629    fn store(&self) -> &ChainDB {
1630        self.sync_shared.store()
1631    }
1632
1633    pub fn state(&self) -> &SyncState {
1634        self.sync_shared.state()
1635    }
1636
1637    fn snapshot(&self) -> &Snapshot {
1638        &self.snapshot
1639    }
1640
1641    pub fn get_block_hash(&self, number: BlockNumber) -> Option<packed::Byte32> {
1642        self.snapshot().get_block_hash(number)
1643    }
1644
1645    pub fn get_block(&self, h: &packed::Byte32) -> Option<core::BlockView> {
1646        self.store().get_block(h)
1647    }
1648
1649    pub fn get_block_header(&self, h: &packed::Byte32) -> Option<core::HeaderView> {
1650        self.store().get_block_header(h)
1651    }
1652
1653    pub fn get_block_ext(&self, h: &packed::Byte32) -> Option<core::BlockExt> {
1654        self.snapshot().get_block_ext(h)
1655    }
1656
1657    pub fn get_block_filter(&self, hash: &packed::Byte32) -> Option<packed::Bytes> {
1658        self.store().get_block_filter(hash)
1659    }
1660
1661    pub fn get_block_filter_hash(&self, hash: &packed::Byte32) -> Option<packed::Byte32> {
1662        self.store().get_block_filter_hash(hash)
1663    }
1664
1665    pub fn get_latest_built_filter_block_number(&self) -> BlockNumber {
1666        self.snapshot
1667            .get_latest_built_filter_data_block_hash()
1668            .and_then(|hash| self.snapshot.get_block_number(&hash))
1669            .unwrap_or_default()
1670    }
1671
1672    pub fn total_difficulty(&self) -> &U256 {
1673        self.snapshot.total_difficulty()
1674    }
1675
1676    pub fn tip_header(&self) -> core::HeaderView {
1677        self.snapshot.tip_header().clone()
1678    }
1679
1680    pub fn tip_hash(&self) -> Byte32 {
1681        self.snapshot.tip_hash()
1682    }
1683
1684    pub fn tip_number(&self) -> BlockNumber {
1685        self.snapshot.tip_number()
1686    }
1687
1688    pub fn epoch_ext(&self) -> core::EpochExt {
1689        self.snapshot.epoch_ext().clone()
1690    }
1691
1692    pub fn is_main_chain(&self, hash: &packed::Byte32) -> bool {
1693        self.snapshot.is_main_chain(hash)
1694    }
1695    pub fn is_unverified_chain(&self, hash: &packed::Byte32) -> bool {
1696        self.store().get_block_epoch_index(hash).is_some()
1697    }
1698
1699    pub fn is_initial_block_download(&self) -> bool {
1700        self.shared().is_initial_block_download()
1701    }
1702    pub fn unverified_tip_header(&self) -> HeaderIndex {
1703        self.shared().get_unverified_tip()
1704    }
1705
1706    pub fn unverified_tip_hash(&self) -> Byte32 {
1707        self.unverified_tip_header().hash()
1708    }
1709
1710    pub fn unverified_tip_number(&self) -> BlockNumber {
1711        self.unverified_tip_header().number()
1712    }
1713
1714    pub fn get_ancestor(&self, base: &Byte32, number: BlockNumber) -> Option<HeaderIndexView> {
1715        self.get_ancestor_internal(base, number, false)
1716    }
1717
1718    pub fn get_ancestor_with_unverified(
1719        &self,
1720        base: &Byte32,
1721        number: BlockNumber,
1722    ) -> Option<HeaderIndexView> {
1723        self.get_ancestor_internal(base, number, true)
1724    }
1725
1726    fn get_ancestor_internal(
1727        &self,
1728        base: &Byte32,
1729        number: BlockNumber,
1730        with_unverified: bool,
1731    ) -> Option<HeaderIndexView> {
1732        let tip_number = {
1733            if with_unverified {
1734                self.unverified_tip_number()
1735            } else {
1736                self.tip_number()
1737            }
1738        };
1739
1740        let block_is_on_chain_fn = |hash: &Byte32| {
1741            if with_unverified {
1742                self.is_unverified_chain(hash)
1743            } else {
1744                self.is_main_chain(hash)
1745            }
1746        };
1747
1748        let get_header_view_fn = |hash: &Byte32, store_first: bool| {
1749            self.sync_shared.get_header_index_view(hash, store_first)
1750        };
1751
1752        let fast_scanner_fn = |number: BlockNumber, current: BlockNumberAndHash| {
1753            // shortcut to return an ancestor block
1754            if current.number <= tip_number && block_is_on_chain_fn(&current.hash) {
1755                self.get_block_hash(number)
1756                    .and_then(|hash| self.sync_shared.get_header_index_view(&hash, true))
1757            } else {
1758                None
1759            }
1760        };
1761
1762        self.sync_shared
1763            .get_header_index_view(base, false)?
1764            .get_ancestor(tip_number, number, get_header_view_fn, fast_scanner_fn)
1765    }
1766
1767    pub fn get_locator(&self, start: BlockNumberAndHash) -> Vec<Byte32> {
1768        let mut step = 1;
1769        let mut locator = Vec::with_capacity(32);
1770        let mut index = start.number();
1771        let mut base = start.hash();
1772
1773        loop {
1774            let header_hash = self
1775                .get_ancestor(&base, index)
1776                .unwrap_or_else(|| {
1777                    panic!(
1778                        "index calculated in get_locator: \
1779                         start: {:?}, base: {}, step: {}, locators({}): {:?}.",
1780                        start,
1781                        base,
1782                        step,
1783                        locator.len(),
1784                        locator,
1785                    )
1786                })
1787                .hash();
1788            locator.push(header_hash.clone());
1789
1790            if locator.len() >= 10 {
1791                step <<= 1;
1792            }
1793
1794            if index < step * 2 {
1795                // Insert some low-height blocks in the locator
1796                // to quickly start parallel ibd block downloads
1797                // and it should not be too much
1798                //
1799                // 100 * 365 * 86400 / 8 = 394200000  100 years block number
1800                // 2 ** 29 = 536870912
1801                // 2 ** 13 = 8192
1802                // 52 = 10 + 29 + 13
1803                if locator.len() < 52 && index > ONE_DAY_BLOCK_NUMBER {
1804                    index >>= 1;
1805                    base = header_hash;
1806                    continue;
1807                }
1808                // always include genesis hash
1809                if index != 0 {
1810                    locator.push(self.sync_shared.consensus().genesis_hash());
1811                }
1812                break;
1813            }
1814            index -= step;
1815            base = header_hash;
1816        }
1817        locator
1818    }
1819
1820    pub fn last_common_ancestor(
1821        &self,
1822        pa: &BlockNumberAndHash,
1823        pb: &BlockNumberAndHash,
1824    ) -> Option<BlockNumberAndHash> {
1825        let (mut m_left, mut m_right) = if pa.number() > pb.number() {
1826            (pb.clone(), pa.clone())
1827        } else {
1828            (pa.clone(), pb.clone())
1829        };
1830
1831        m_right = self
1832            .get_ancestor(&m_right.hash(), m_left.number())?
1833            .number_and_hash();
1834        if m_left == m_right {
1835            return Some(m_left);
1836        }
1837        debug_assert!(m_left.number() == m_right.number());
1838
1839        while m_left != m_right {
1840            m_left = self
1841                .get_ancestor(&m_left.hash(), m_left.number() - 1)?
1842                .number_and_hash();
1843            m_right = self
1844                .get_ancestor(&m_right.hash(), m_right.number() - 1)?
1845                .number_and_hash();
1846        }
1847        Some(m_left)
1848    }
1849
1850    pub fn locate_latest_common_block(
1851        &self,
1852        _hash_stop: &Byte32,
1853        locator: &[Byte32],
1854    ) -> Option<BlockNumber> {
1855        if locator.is_empty() {
1856            return None;
1857        }
1858
1859        let locator_hash = locator.last().expect("empty checked");
1860        if locator_hash != &self.sync_shared.consensus().genesis_hash() {
1861            return None;
1862        }
1863
1864        // iterator are lazy
1865        let (index, latest_common) = locator
1866            .iter()
1867            .enumerate()
1868            .map(|(index, hash)| (index, self.snapshot.get_block_number(hash)))
1869            .find(|(_index, number)| number.is_some())
1870            .expect("locator last checked");
1871
1872        if index == 0 || latest_common == Some(0) {
1873            return latest_common;
1874        }
1875
1876        if let Some(header) = locator
1877            .get(index - 1)
1878            .and_then(|hash| self.sync_shared.store().get_block_header(hash))
1879        {
1880            let mut block_hash = header.data().raw().parent_hash();
1881            loop {
1882                let block_header = match self.sync_shared.store().get_block_header(&block_hash) {
1883                    None => break latest_common,
1884                    Some(block_header) => block_header,
1885                };
1886
1887                if let Some(block_number) = self.snapshot.get_block_number(&block_hash) {
1888                    return Some(block_number);
1889                }
1890
1891                block_hash = block_header.data().raw().parent_hash();
1892            }
1893        } else {
1894            latest_common
1895        }
1896    }
1897
1898    pub fn get_locator_response(
1899        &self,
1900        block_number: BlockNumber,
1901        hash_stop: &Byte32,
1902    ) -> Vec<core::HeaderView> {
1903        let tip_number = self.tip_header().number();
1904        let max_height = cmp::min(
1905            block_number + 1 + MAX_HEADERS_LEN as BlockNumber,
1906            tip_number + 1,
1907        );
1908        (block_number + 1..max_height)
1909            .filter_map(|block_number| self.snapshot.get_block_hash(block_number))
1910            .take_while(|block_hash| block_hash != hash_stop)
1911            .filter_map(|block_hash| self.sync_shared.store().get_block_header(&block_hash))
1912            .collect()
1913    }
1914
1915    pub fn send_getheaders_to_peer(
1916        &self,
1917        nc: &Arc<dyn CKBProtocolContext + Sync>,
1918        peer: PeerIndex,
1919        block_number_and_hash: BlockNumberAndHash,
1920    ) {
1921        if let Some(last_time) = self
1922            .state()
1923            .pending_get_headers
1924            .write()
1925            .get(&(peer, block_number_and_hash.hash()))
1926        {
1927            if Instant::now() < *last_time + GET_HEADERS_TIMEOUT {
1928                debug!(
1929                    "Last get_headers request to peer {} is less than {:?}; Ignore it.",
1930                    peer, GET_HEADERS_TIMEOUT,
1931                );
1932                return;
1933            } else {
1934                debug!(
1935                    "Can not get headers from {} in {:?}, retry",
1936                    peer, GET_HEADERS_TIMEOUT,
1937                );
1938            }
1939        }
1940        self.state()
1941            .pending_get_headers
1942            .write()
1943            .put((peer, block_number_and_hash.hash()), Instant::now());
1944
1945        debug!(
1946            "send_getheaders_to_peer peer={}, hash={}",
1947            peer,
1948            block_number_and_hash.hash()
1949        );
1950        let locator_hash = self.get_locator(block_number_and_hash);
1951        let content = packed::GetHeaders::new_builder()
1952            .block_locator_hashes(locator_hash)
1953            .hash_stop(packed::Byte32::zero())
1954            .build();
1955        let message = packed::SyncMessage::new_builder().set(content).build();
1956        let nc = Arc::clone(nc);
1957        self.shared().async_handle().spawn(async move {
1958            async_send_message(SupportProtocols::Sync.protocol_id(), &nc, peer, &message).await
1959        });
1960    }
1961
1962    pub fn get_block_status(&self, block_hash: &Byte32) -> BlockStatus {
1963        self.shared().get_block_status(block_hash)
1964    }
1965
1966    pub fn contains_block_status(&self, block_hash: &Byte32, status: BlockStatus) -> bool {
1967        self.get_block_status(block_hash).contains(status)
1968    }
1969}
1970
1971/// The `IBDState` enum represents whether the node is currently in the IBD process (`In`) or has
1972/// completed it (`Out`).
1973#[derive(Clone, Copy, Debug)]
1974pub enum IBDState {
1975    In,
1976    Out,
1977}
1978
1979impl From<bool> for IBDState {
1980    fn from(src: bool) -> Self {
1981        if src { IBDState::In } else { IBDState::Out }
1982    }
1983}
1984
1985impl From<IBDState> for bool {
1986    fn from(s: IBDState) -> bool {
1987        match s {
1988            IBDState::In => true,
1989            IBDState::Out => false,
1990        }
1991    }
1992}
1993
1994pub(crate) fn post_sync_process(
1995    nc: &dyn CKBProtocolContext,
1996    peer: PeerIndex,
1997    item_name: &str,
1998    status: Status,
1999) {
2000    if let Some(ban_time) = status.should_ban() {
2001        error!(
2002            "Receive {} from {}. Ban {:?} for {}",
2003            item_name, peer, ban_time, status
2004        );
2005        nc.ban_peer(peer, ban_time, status.to_string());
2006    } else if status.should_warn() {
2007        warn!("Receive {} from {}, {}", item_name, peer, status);
2008    } else if !status.is_ok() {
2009        debug!("Receive {} from {}, {}", item_name, peer, status);
2010    }
2011}