1use crate::{FAST_INDEX, LOW_INDEX, NORMAL_INDEX, Status, StatusCode, TIME_TRACE_SIZE};
2use ckb_app_config::SyncConfig;
3#[cfg(test)]
4use ckb_chain::VerifyResult;
5use ckb_chain::{ChainController, RemoteBlock};
6use ckb_chain_spec::consensus::{Consensus, MAX_BLOCK_INTERVAL, MIN_BLOCK_INTERVAL};
7use ckb_channel::Receiver;
8use ckb_constant::sync::{
9 BLOCK_DOWNLOAD_TIMEOUT, HEADERS_DOWNLOAD_HEADERS_PER_SECOND, HEADERS_DOWNLOAD_INSPECT_WINDOW,
10 HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
11 MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
12 MAX_UNKNOWN_TX_HASHES_SIZE, MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER, POW_INTERVAL,
13 RETRY_ASK_TX_TIMEOUT_INCREASE, SUSPEND_SYNC_TIME,
14};
15use ckb_logger::{debug, error, info, trace, warn};
16use ckb_network::{CKBProtocolContext, PeerIndex, SupportProtocols};
17use ckb_shared::{
18 Snapshot,
19 block_status::BlockStatus,
20 shared::Shared,
21 types::{HeaderIndex, HeaderIndexView, SHRINK_THRESHOLD},
22};
23use ckb_store::{ChainDB, ChainStore};
24use ckb_systemtime::unix_time_as_millis;
25use ckb_traits::{HeaderFields, HeaderFieldsProvider};
26use ckb_tx_pool::service::TxVerificationResult;
27use ckb_types::BlockNumberAndHash;
28use ckb_types::{
29 U256,
30 core::{self, BlockNumber, EpochExt},
31 packed::{self, Byte32},
32 prelude::*,
33};
34use ckb_util::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, shrink_to_fit};
35use dashmap::{self, DashMap};
36use keyed_priority_queue::{self, KeyedPriorityQueue};
37use lru::LruCache;
38use std::collections::{BTreeMap, HashMap, HashSet, btree_map::Entry};
39use std::hash::Hash;
40use std::sync::Arc;
41use std::sync::atomic::{AtomicUsize, Ordering};
42use std::time::{Duration, Instant};
43use std::{cmp, fmt, iter};
44
45use crate::utils::send_message;
46
47const GET_HEADERS_CACHE_SIZE: usize = 10000;
48const GET_HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
50const FILTER_SIZE: usize = 50000;
51const ONE_DAY_BLOCK_NUMBER: u64 = 8192;
53pub(crate) const FILTER_TTL: u64 = 4 * 60 * 60;
54
55#[derive(Clone, Debug, Default)]
70pub struct ChainSyncState {
71 pub timeout: u64,
72 pub work_header: Option<core::HeaderView>,
73 pub total_difficulty: Option<U256>,
74 pub sent_getheaders: bool,
75 headers_sync_state: HeadersSyncState,
76}
77
78impl ChainSyncState {
79 fn can_start_sync(&self, now: u64) -> bool {
80 match self.headers_sync_state {
81 HeadersSyncState::Initialized => false,
82 HeadersSyncState::SyncProtocolConnected => true,
83 HeadersSyncState::Started => false,
84 HeadersSyncState::Suspend(until) | HeadersSyncState::TipSynced(until) => until < now,
85 }
86 }
87
88 fn connected(&mut self) {
89 self.headers_sync_state = HeadersSyncState::SyncProtocolConnected;
90 }
91
92 fn start(&mut self) {
93 self.headers_sync_state = HeadersSyncState::Started
94 }
95
96 fn suspend(&mut self, until: u64) {
97 self.headers_sync_state = HeadersSyncState::Suspend(until)
98 }
99
100 fn tip_synced(&mut self) {
101 let now = unix_time_as_millis();
102 let avg_interval = (MAX_BLOCK_INTERVAL + MIN_BLOCK_INTERVAL) / 2;
103 self.headers_sync_state = HeadersSyncState::TipSynced(now + avg_interval * 1000);
104 }
105
106 fn started(&self) -> bool {
107 matches!(self.headers_sync_state, HeadersSyncState::Started)
108 }
109
110 fn started_or_tip_synced(&self) -> bool {
111 matches!(
112 self.headers_sync_state,
113 HeadersSyncState::Started | HeadersSyncState::TipSynced(_)
114 )
115 }
116}
117
118#[derive(Default, Clone, Debug)]
119enum HeadersSyncState {
120 #[default]
121 Initialized,
122 SyncProtocolConnected,
123 Started,
124 Suspend(u64), TipSynced(u64), }
127
128#[derive(Clone, Default, Debug, Copy)]
129pub struct PeerFlags {
130 pub is_outbound: bool,
131 pub is_protect: bool,
132 pub is_whitelist: bool,
133 pub is_2023edition: bool,
134}
135
136#[derive(Clone, Default, Debug, Copy)]
137pub struct HeadersSyncController {
138 pub(crate) started_ts: u64,
140 pub(crate) started_tip_ts: u64,
142
143 pub(crate) last_updated_ts: u64,
145 pub(crate) last_updated_tip_ts: u64,
147
148 pub(crate) is_close_to_the_end: bool,
149}
150
151impl HeadersSyncController {
152 #[cfg(test)]
153 pub(crate) fn new(
154 started_ts: u64,
155 started_tip_ts: u64,
156 last_updated_ts: u64,
157 last_updated_tip_ts: u64,
158 is_close_to_the_end: bool,
159 ) -> Self {
160 Self {
161 started_ts,
162 started_tip_ts,
163 last_updated_ts,
164 last_updated_tip_ts,
165 is_close_to_the_end,
166 }
167 }
168
169 pub(crate) fn from_header(better_tip_header: &HeaderIndexView) -> Self {
170 let started_ts = unix_time_as_millis();
171 let started_tip_ts = better_tip_header.timestamp();
172 Self {
173 started_ts,
174 started_tip_ts,
175 last_updated_ts: started_ts,
176 last_updated_tip_ts: started_tip_ts,
177 is_close_to_the_end: false,
178 }
179 }
180
181 #[allow(clippy::wrong_self_convention)]
184 pub(crate) fn is_timeout(&mut self, now_tip_ts: u64, now: u64) -> Option<bool> {
185 let inspect_window = HEADERS_DOWNLOAD_INSPECT_WINDOW;
186 let expected_headers_per_sec = HEADERS_DOWNLOAD_HEADERS_PER_SECOND;
187 let tolerable_bias = HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE;
188
189 let expected_before_finished = now.saturating_sub(now_tip_ts);
190
191 trace!("headers-sync: better tip ts {}; now {}", now_tip_ts, now);
192
193 if self.is_close_to_the_end {
194 let expected_in_base_time =
195 expected_headers_per_sec * inspect_window * POW_INTERVAL / 1000;
196 if expected_before_finished > expected_in_base_time {
197 self.started_ts = now;
198 self.started_tip_ts = now_tip_ts;
199 self.last_updated_ts = now;
200 self.last_updated_tip_ts = now_tip_ts;
201 self.is_close_to_the_end = false;
202 trace!(
204 "headers-sync: send GetHeaders again since we are significantly behind the tip"
205 );
206 None
207 } else {
208 Some(false)
211 }
212 } else if expected_before_finished < inspect_window {
213 self.is_close_to_the_end = true;
214 trace!("headers-sync: ignore timeout because the tip almost reaches the real time");
215 Some(false)
216 } else {
217 let spent_since_last_updated = now.saturating_sub(self.last_updated_ts);
218
219 if spent_since_last_updated < inspect_window {
220 Some(false)
222 } else {
223 let synced_since_last_updated = now_tip_ts.saturating_sub(self.last_updated_tip_ts);
224 let expected_since_last_updated =
225 expected_headers_per_sec * spent_since_last_updated * POW_INTERVAL / 1000;
226
227 if synced_since_last_updated < expected_since_last_updated / tolerable_bias {
228 trace!("headers-sync: the instantaneous speed is too slow");
230 Some(true)
231 } else {
232 self.last_updated_ts = now;
233 self.last_updated_tip_ts = now_tip_ts;
234
235 if synced_since_last_updated > expected_since_last_updated {
236 trace!("headers-sync: the instantaneous speed is acceptable");
237 Some(false)
238 } else {
239 let spent_since_started = now.saturating_sub(self.started_ts);
241 let synced_since_started = now_tip_ts.saturating_sub(self.started_tip_ts);
242
243 let expected_since_started =
244 expected_headers_per_sec * spent_since_started * POW_INTERVAL / 1000;
245
246 if synced_since_started < expected_since_started {
247 trace!(
249 "headers-sync: both the global average speed and the instantaneous speed \
250 are slower than expected"
251 );
252 Some(true)
253 } else {
254 trace!("headers-sync: the global average speed is acceptable");
255 Some(false)
256 }
257 }
258 }
259 }
260 }
261 }
262}
263
264#[derive(Clone, Default, Debug)]
265pub struct PeerState {
266 pub headers_sync_controller: Option<HeadersSyncController>,
267 pub peer_flags: PeerFlags,
268 pub chain_sync: ChainSyncState,
269 pub best_known_header: Option<HeaderIndex>,
271 pub last_common_header: Option<BlockNumberAndHash>,
273 pub unknown_header_list: Vec<Byte32>,
276}
277
278impl PeerState {
279 pub fn new(peer_flags: PeerFlags) -> PeerState {
280 PeerState {
281 headers_sync_controller: None,
282 peer_flags,
283 chain_sync: ChainSyncState::default(),
284 best_known_header: None,
285 last_common_header: None,
286 unknown_header_list: Vec::new(),
287 }
288 }
289
290 pub fn can_start_sync(&self, now: u64, ibd: bool) -> bool {
291 ((self.peer_flags.is_protect || self.peer_flags.is_whitelist) || !ibd)
293 && self.chain_sync.can_start_sync(now)
294 }
295
296 pub fn start_sync(&mut self, headers_sync_controller: HeadersSyncController) {
297 self.chain_sync.start();
298 self.headers_sync_controller = Some(headers_sync_controller);
299 }
300
301 fn suspend_sync(&mut self, suspend_time: u64) {
302 let now = unix_time_as_millis();
303 self.chain_sync.suspend(now + suspend_time);
304 self.headers_sync_controller = None;
305 }
306
307 fn tip_synced(&mut self) {
308 self.chain_sync.tip_synced();
309 self.headers_sync_controller = None;
310 }
311
312 pub(crate) fn sync_started(&self) -> bool {
313 self.chain_sync.started()
314 }
315
316 pub(crate) fn started_or_tip_synced(&self) -> bool {
317 self.chain_sync.started_or_tip_synced()
318 }
319
320 pub(crate) fn sync_connected(&mut self) {
321 self.chain_sync.connected()
322 }
323}
324
325pub struct TtlFilter<T> {
326 inner: LruCache<T, u64>,
327 ttl: u64,
328}
329
330impl<T: Eq + Hash + Clone> Default for TtlFilter<T> {
331 fn default() -> Self {
332 TtlFilter::new(FILTER_SIZE, FILTER_TTL)
333 }
334}
335
336impl<T: Eq + Hash + Clone> TtlFilter<T> {
337 pub fn new(size: usize, ttl: u64) -> Self {
338 Self {
339 inner: LruCache::new(size),
340 ttl,
341 }
342 }
343
344 pub fn contains(&self, item: &T) -> bool {
345 self.inner.contains(item)
346 }
347
348 pub fn insert(&mut self, item: T) -> bool {
349 let now = ckb_systemtime::unix_time().as_secs();
350 self.inner.put(item, now).is_none()
351 }
352
353 pub fn remove(&mut self, item: &T) -> bool {
354 self.inner.pop(item).is_some()
355 }
356
357 pub fn remove_expired(&mut self) {
359 let now = ckb_systemtime::unix_time().as_secs();
360 let expired_keys: Vec<T> = self
361 .inner
362 .iter()
363 .filter_map(|(key, time)| {
364 if *time + self.ttl < now {
365 Some(key)
366 } else {
367 None
368 }
369 })
370 .cloned()
371 .collect();
372
373 for k in expired_keys {
374 self.remove(&k);
375 }
376 }
377}
378
379#[derive(Default)]
380pub struct Peers {
381 pub state: DashMap<PeerIndex, PeerState>,
382 pub n_sync_started: AtomicUsize,
383 pub n_protected_outbound_peers: AtomicUsize,
384}
385
386#[derive(Debug, Clone)]
387pub struct InflightState {
388 pub(crate) peer: PeerIndex,
389 pub(crate) timestamp: u64,
390}
391
392impl InflightState {
393 fn new(peer: PeerIndex) -> Self {
394 Self {
395 peer,
396 timestamp: unix_time_as_millis(),
397 }
398 }
399}
400
401enum TimeQuantile {
402 MinToFast,
403 FastToNormal,
404 NormalToUpper,
405 UpperToMax,
406}
407
408#[derive(Clone)]
424struct TimeAnalyzer {
425 trace: [u64; TIME_TRACE_SIZE],
426 index: usize,
427 fast_time: u64,
428 normal_time: u64,
429 low_time: u64,
430}
431
432impl fmt::Debug for TimeAnalyzer {
433 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
434 fmt.debug_struct("TimeAnalyzer")
435 .field("fast_time", &self.fast_time)
436 .field("normal_time", &self.normal_time)
437 .field("low_time", &self.low_time)
438 .finish()
439 }
440}
441
442impl Default for TimeAnalyzer {
443 fn default() -> Self {
444 Self {
446 trace: [0; TIME_TRACE_SIZE],
447 index: 0,
448 fast_time: 1000,
449 normal_time: 1250,
450 low_time: 1500,
451 }
452 }
453}
454
455impl TimeAnalyzer {
456 fn push_time(&mut self, time: u64) -> TimeQuantile {
457 if self.index < TIME_TRACE_SIZE {
458 self.trace[self.index] = time;
459 self.index += 1;
460 } else {
461 self.trace.sort_unstable();
462 self.fast_time = (self.fast_time.saturating_add(self.trace[FAST_INDEX])) >> 1;
463 self.normal_time = (self.normal_time.saturating_add(self.trace[NORMAL_INDEX])) >> 1;
464 self.low_time = (self.low_time.saturating_add(self.trace[LOW_INDEX])) >> 1;
465 self.trace[0] = time;
466 self.index = 1;
467 }
468
469 if time <= self.fast_time {
470 TimeQuantile::MinToFast
471 } else if time <= self.normal_time {
472 TimeQuantile::FastToNormal
473 } else if time > self.low_time {
474 TimeQuantile::UpperToMax
475 } else {
476 TimeQuantile::NormalToUpper
477 }
478 }
479}
480
481#[derive(Debug, Clone)]
482pub struct DownloadScheduler {
483 task_count: usize,
484 timeout_count: usize,
485 hashes: HashSet<BlockNumberAndHash>,
486}
487
488impl Default for DownloadScheduler {
489 fn default() -> Self {
490 Self {
491 hashes: HashSet::default(),
492 task_count: INIT_BLOCKS_IN_TRANSIT_PER_PEER,
493 timeout_count: 0,
494 }
495 }
496}
497
498impl DownloadScheduler {
499 fn inflight_count(&self) -> usize {
500 self.hashes.len()
501 }
502
503 fn can_fetch(&self) -> usize {
504 self.task_count.saturating_sub(self.hashes.len())
505 }
506
507 pub(crate) const fn task_count(&self) -> usize {
508 self.task_count
509 }
510
511 fn increase(&mut self, num: usize) {
512 if self.task_count < MAX_BLOCKS_IN_TRANSIT_PER_PEER {
513 self.task_count = ::std::cmp::min(
514 self.task_count.saturating_add(num),
515 MAX_BLOCKS_IN_TRANSIT_PER_PEER,
516 )
517 }
518 }
519
520 fn decrease(&mut self, num: usize) {
521 self.timeout_count = self.task_count.saturating_add(num);
522 if self.timeout_count > 2 {
523 self.task_count = self.task_count.saturating_sub(1);
524 self.timeout_count = 0;
525 }
526 }
527
528 fn punish(&mut self, exp: usize) {
529 self.task_count >>= exp
530 }
531}
532
533#[derive(Clone)]
534pub struct InflightBlocks {
535 pub(crate) download_schedulers: HashMap<PeerIndex, DownloadScheduler>,
536 inflight_states: BTreeMap<BlockNumberAndHash, InflightState>,
537 pub(crate) trace_number: HashMap<BlockNumberAndHash, u64>,
538 pub(crate) restart_number: BlockNumber,
539 time_analyzer: TimeAnalyzer,
540 pub(crate) adjustment: bool,
541 pub(crate) protect_num: usize,
542}
543
544impl Default for InflightBlocks {
545 fn default() -> Self {
546 InflightBlocks {
547 download_schedulers: HashMap::default(),
548 inflight_states: BTreeMap::default(),
549 trace_number: HashMap::default(),
550 restart_number: 0,
551 time_analyzer: TimeAnalyzer::default(),
552 adjustment: true,
553 protect_num: MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
554 }
555 }
556}
557
558struct DebugHashSet<'a>(&'a HashSet<BlockNumberAndHash>);
559
560impl<'a> fmt::Debug for DebugHashSet<'a> {
561 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
562 fmt.debug_set()
563 .entries(self.0.iter().map(|h| format!("{}, {}", h.number, h.hash)))
564 .finish()
565 }
566}
567
568impl fmt::Debug for InflightBlocks {
569 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
570 fmt.debug_map()
571 .entries(
572 self.download_schedulers
573 .iter()
574 .map(|(k, v)| (k, DebugHashSet(&v.hashes))),
575 )
576 .finish()?;
577 fmt.debug_map()
578 .entries(
579 self.inflight_states
580 .iter()
581 .map(|(k, v)| (format!("{}, {}", k.number, k.hash), v)),
582 )
583 .finish()?;
584 self.time_analyzer.fmt(fmt)
585 }
586}
587
588impl InflightBlocks {
589 pub fn blocks_iter(&self) -> impl Iterator<Item = (&PeerIndex, &HashSet<BlockNumberAndHash>)> {
590 self.download_schedulers.iter().map(|(k, v)| (k, &v.hashes))
591 }
592
593 pub fn total_inflight_count(&self) -> usize {
594 self.inflight_states.len()
595 }
596
597 pub fn division_point(&self) -> (u64, u64, u64) {
598 (
599 self.time_analyzer.fast_time,
600 self.time_analyzer.normal_time,
601 self.time_analyzer.low_time,
602 )
603 }
604
605 pub fn peer_inflight_count(&self, peer: PeerIndex) -> usize {
606 self.download_schedulers
607 .get(&peer)
608 .map(DownloadScheduler::inflight_count)
609 .unwrap_or(0)
610 }
611
612 pub fn peer_can_fetch_count(&self, peer: PeerIndex) -> usize {
613 self.download_schedulers.get(&peer).map_or(
614 INIT_BLOCKS_IN_TRANSIT_PER_PEER,
615 DownloadScheduler::can_fetch,
616 )
617 }
618
619 pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet<BlockNumberAndHash>> {
620 self.download_schedulers.get(&peer).map(|d| &d.hashes)
621 }
622
623 pub fn inflight_state_by_block(&self, block: &BlockNumberAndHash) -> Option<&InflightState> {
624 self.inflight_states.get(block)
625 }
626
627 pub fn mark_slow_block(&mut self, tip: BlockNumber) {
628 let now = ckb_systemtime::unix_time_as_millis();
629 for key in self.inflight_states.keys() {
630 if key.number > tip + 1 {
631 break;
632 }
633 self.trace_number.entry(key.clone()).or_insert(now);
634 }
635 }
636
637 pub fn prune(&mut self, tip: BlockNumber) -> HashSet<PeerIndex> {
638 let now = unix_time_as_millis();
639 let mut disconnect_list = HashSet::new();
640 let should_punish = self.download_schedulers.len() > self.protect_num;
649 let adjustment = self.adjustment;
650
651 let trace = &mut self.trace_number;
652 let download_schedulers = &mut self.download_schedulers;
653 let states = &mut self.inflight_states;
654
655 let mut remove_key = Vec::new();
656 let end = tip + 20;
661 for (key, value) in states.iter() {
662 if key.number > end {
663 break;
664 }
665 if value.timestamp + BLOCK_DOWNLOAD_TIMEOUT < now {
666 if let Some(set) = download_schedulers.get_mut(&value.peer) {
667 set.hashes.remove(key);
668 if should_punish && adjustment {
669 set.punish(2);
670 }
671 };
672 if !trace.is_empty() {
673 trace.remove(key);
674 }
675 remove_key.push(key.clone());
676 debug!(
677 "prune: remove InflightState: remove {}-{} from {}",
678 key.number, key.hash, value.peer
679 );
680
681 if let Some(metrics) = ckb_metrics::handle() {
682 metrics.ckb_inflight_timeout_count.inc();
683 }
684 }
685 }
686
687 for key in remove_key {
688 states.remove(&key);
689 }
690
691 download_schedulers.retain(|k, v| {
692 if v.task_count == 0 {
694 disconnect_list.insert(*k);
695 false
696 } else {
697 true
698 }
699 });
700 shrink_to_fit!(download_schedulers, SHRINK_THRESHOLD);
701
702 if self.restart_number != 0 && tip + 1 > self.restart_number {
703 self.restart_number = 0;
704 }
705
706 let timeout_limit = self.time_analyzer.low_time;
710
711 let restart_number = &mut self.restart_number;
712 trace.retain(|key, time| {
713 if now > timeout_limit + *time {
722 if let Some(state) = states.remove(key) {
723 if let Some(d) = download_schedulers.get_mut(&state.peer) {
724 if should_punish && adjustment {
725 d.punish(1);
726 }
727 d.hashes.remove(key);
728 debug!(
729 "prune: remove download_schedulers: remove {}-{} from {}",
730 key.number, key.hash, state.peer
731 );
732 };
733 }
734
735 if key.number > *restart_number {
736 *restart_number = key.number;
737 }
738 return false;
739 }
740 true
741 });
742 shrink_to_fit!(trace, SHRINK_THRESHOLD);
743
744 disconnect_list
745 }
746
747 pub fn insert(&mut self, peer: PeerIndex, block: BlockNumberAndHash) -> bool {
748 let state = self.inflight_states.entry(block.clone());
749 match state {
750 Entry::Occupied(_entry) => return false,
751 Entry::Vacant(entry) => entry.insert(InflightState::new(peer)),
752 };
753
754 if self.restart_number >= block.number {
755 self.trace_number
758 .insert(block.clone(), unix_time_as_millis());
759 }
760
761 let download_scheduler = self.download_schedulers.entry(peer).or_default();
762 download_scheduler.hashes.insert(block)
763 }
764
765 pub fn remove_by_peer(&mut self, peer: PeerIndex) -> usize {
766 let trace = &mut self.trace_number;
767 let state = &mut self.inflight_states;
768
769 self.download_schedulers
770 .remove(&peer)
771 .map(|blocks| {
772 let blocks_count = blocks.hashes.iter().len();
773 for block in blocks.hashes {
774 state.remove(&block);
775 if !trace.is_empty() {
776 trace.remove(&block);
777 }
778 }
779 blocks_count
780 })
781 .unwrap_or_default()
782 }
783
784 pub fn remove_by_block(&mut self, block: BlockNumberAndHash) -> bool {
785 let should_punish = self.download_schedulers.len() > self.protect_num;
786 let download_schedulers = &mut self.download_schedulers;
787 let trace = &mut self.trace_number;
788 let time_analyzer = &mut self.time_analyzer;
789 let adjustment = self.adjustment;
790 self.inflight_states
791 .remove(&block)
792 .map(|state| {
793 let elapsed = unix_time_as_millis().saturating_sub(state.timestamp);
794 if let Some(set) = download_schedulers.get_mut(&state.peer) {
795 set.hashes.remove(&block);
796 if adjustment {
797 match time_analyzer.push_time(elapsed) {
798 TimeQuantile::MinToFast => set.increase(2),
799 TimeQuantile::FastToNormal => set.increase(1),
800 TimeQuantile::NormalToUpper => {
801 if should_punish {
802 set.decrease(1)
803 }
804 }
805 TimeQuantile::UpperToMax => {
806 if should_punish {
807 set.decrease(2)
808 }
809 }
810 }
811 }
812 if !trace.is_empty() {
813 trace.remove(&block);
814 }
815 };
816 })
817 .is_some()
818 }
819}
820
821impl Peers {
822 pub fn sync_connected(
823 &self,
824 peer: PeerIndex,
825 is_outbound: bool,
826 is_whitelist: bool,
827 is_2023edition: bool,
828 ) {
829 let protect_outbound = is_outbound
830 && self
831 .n_protected_outbound_peers
832 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
833 if x < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT {
834 Some(x + 1)
835 } else {
836 None
837 }
838 })
839 .is_ok();
840
841 let peer_flags = PeerFlags {
842 is_outbound,
843 is_whitelist,
844 is_2023edition,
845 is_protect: protect_outbound,
846 };
847 self.state
848 .entry(peer)
849 .and_modify(|state| {
850 state.peer_flags = peer_flags;
851 state.sync_connected();
852 })
853 .or_insert_with(|| {
854 let mut state = PeerState::new(peer_flags);
855 state.sync_connected();
856 state
857 });
858 }
859
860 pub fn relay_connected(&self, peer: PeerIndex) {
861 self.state
862 .entry(peer)
863 .or_insert_with(|| PeerState::new(PeerFlags::default()));
864 }
865
866 pub fn get_best_known_header(&self, pi: PeerIndex) -> Option<HeaderIndex> {
867 self.state
868 .get(&pi)
869 .and_then(|peer_state| peer_state.best_known_header.clone())
870 }
871
872 pub fn may_set_best_known_header(&self, peer: PeerIndex, header_index: HeaderIndex) {
873 if let Some(mut peer_state) = self.state.get_mut(&peer) {
874 if let Some(ref known) = peer_state.best_known_header {
875 if header_index.is_better_chain(known) {
876 peer_state.best_known_header = Some(header_index);
877 }
878 } else {
879 peer_state.best_known_header = Some(header_index);
880 }
881 }
882 }
883
884 pub fn get_last_common_header(&self, pi: PeerIndex) -> Option<BlockNumberAndHash> {
885 self.state
886 .get(&pi)
887 .and_then(|peer_state| peer_state.last_common_header.clone())
888 }
889
890 pub fn set_last_common_header(&self, pi: PeerIndex, header: BlockNumberAndHash) {
891 self.state
892 .entry(pi)
893 .and_modify(|peer_state| peer_state.last_common_header = Some(header));
894 }
895
896 pub fn getheaders_received(&self, _peer: PeerIndex) {
897 }
899
900 pub fn disconnected(&self, peer: PeerIndex) {
901 if let Some(peer_state) = self.state.remove(&peer).map(|(_, peer_state)| peer_state) {
902 if peer_state.sync_started() {
903 assert_ne!(
907 self.n_sync_started.fetch_sub(1, Ordering::AcqRel),
908 0,
909 "n_sync_started overflow when disconnects"
910 );
911 }
912
913 if peer_state.peer_flags.is_protect {
915 assert_ne!(
916 self.n_protected_outbound_peers
917 .fetch_sub(1, Ordering::AcqRel),
918 0,
919 "n_protected_outbound_peers overflow when disconnects"
920 );
921 }
922 }
923 }
924
925 pub fn insert_unknown_header_hash(&self, peer: PeerIndex, hash: Byte32) {
926 self.state
927 .entry(peer)
928 .and_modify(|state| state.unknown_header_list.push(hash));
929 }
930
931 pub fn unknown_header_list_is_empty(&self, peer: PeerIndex) -> bool {
932 self.state
933 .get(&peer)
934 .map(|state| state.unknown_header_list.is_empty())
935 .unwrap_or(true)
936 }
937
938 pub fn clear_unknown_list(&self) {
939 self.state.iter_mut().for_each(|mut state| {
940 if !state.unknown_header_list.is_empty() {
941 state.unknown_header_list.clear()
942 }
943 })
944 }
945
946 pub fn get_best_known_less_than_tip_and_unknown_empty(
947 &self,
948 tip: BlockNumber,
949 ) -> Vec<PeerIndex> {
950 self.state
951 .iter()
952 .filter_map(|kv_pair| {
953 let (peer_index, state) = kv_pair.pair();
954 if !state.unknown_header_list.is_empty() {
955 return None;
956 }
957 if let Some(ref header) = state.best_known_header {
958 if header.number() < tip {
959 return Some(*peer_index);
960 }
961 }
962 None
963 })
964 .collect()
965 }
966
967 pub fn take_unknown_last(&self, peer: PeerIndex) -> Option<Byte32> {
968 self.state
969 .get_mut(&peer)
970 .and_then(|mut state| state.unknown_header_list.pop())
971 }
972
973 pub fn get_flag(&self, peer: PeerIndex) -> Option<PeerFlags> {
974 self.state.get(&peer).map(|state| state.peer_flags)
975 }
976}
977
978pub(crate) type PendingCompactBlockMap = HashMap<
980 Byte32,
981 (
982 packed::CompactBlock,
983 HashMap<PeerIndex, (Vec<u32>, Vec<u32>)>,
984 u64,
985 ),
986>;
987
988#[derive(Clone)]
990pub struct SyncShared {
991 shared: Shared,
992 state: Arc<SyncState>,
993}
994
995impl SyncShared {
996 pub fn new(
998 shared: Shared,
999 sync_config: SyncConfig,
1000 tx_relay_receiver: Receiver<TxVerificationResult>,
1001 ) -> SyncShared {
1002 let (total_difficulty, header) = {
1003 let snapshot = shared.snapshot();
1004 (
1005 snapshot.total_difficulty().to_owned(),
1006 snapshot.tip_header().to_owned(),
1007 )
1008 };
1009 let shared_best_header = RwLock::new((header, total_difficulty).into());
1010 info!(
1011 "header_map.memory_limit {}",
1012 sync_config.header_map.memory_limit
1013 );
1014
1015 let state = SyncState {
1016 shared_best_header,
1017 tx_filter: Mutex::new(TtlFilter::default()),
1018 unknown_tx_hashes: Mutex::new(KeyedPriorityQueue::new()),
1019 peers: Peers::default(),
1020 pending_get_block_proposals: DashMap::new(),
1021 pending_compact_blocks: Mutex::new(HashMap::default()),
1022 inflight_proposals: DashMap::new(),
1023 inflight_blocks: RwLock::new(InflightBlocks::default()),
1024 pending_get_headers: RwLock::new(LruCache::new(GET_HEADERS_CACHE_SIZE)),
1025 tx_relay_receiver,
1026 min_chain_work: sync_config.min_chain_work,
1027 };
1028
1029 SyncShared {
1030 shared,
1031 state: Arc::new(state),
1032 }
1033 }
1034
1035 pub fn shared(&self) -> &Shared {
1037 &self.shared
1038 }
1039
1040 pub fn active_chain(&self) -> ActiveChain {
1042 ActiveChain {
1043 sync_shared: self.clone(),
1044 snapshot: Arc::clone(&self.shared.snapshot()),
1045 }
1046 }
1047
1048 pub fn store(&self) -> &ChainDB {
1050 self.shared.store()
1051 }
1052
1053 pub fn state(&self) -> &SyncState {
1055 &self.state
1056 }
1057
1058 pub fn consensus(&self) -> &Consensus {
1060 self.shared.consensus()
1061 }
1062
1063 #[cfg(test)]
1066 pub(crate) fn blocking_insert_new_block(
1067 &self,
1068 chain: &ChainController,
1069 block: Arc<core::BlockView>,
1070 ) -> VerifyResult {
1071 chain.blocking_process_block(block)
1072 }
1073
1074 pub(crate) fn accept_remote_block(&self, chain: &ChainController, remote_block: RemoteBlock) {
1075 {
1076 let entry = self
1077 .shared()
1078 .block_status_map()
1079 .entry(remote_block.block.header().hash());
1080 if let dashmap::mapref::entry::Entry::Vacant(entry) = entry {
1081 entry.insert(BlockStatus::BLOCK_RECEIVED);
1082 }
1083 }
1084
1085 chain.asynchronous_process_remote_block(remote_block)
1086 }
1087
1088 pub fn insert_valid_header(&self, peer: PeerIndex, header: &core::HeaderView) {
1094 let tip_number = self.active_chain().tip_number();
1095 let store_first = tip_number >= header.number();
1096 let parent_hash = Byte32::from_slice(header.data().raw().parent_hash().as_slice())
1099 .expect("checked slice length");
1100 let parent_header_index = self
1101 .get_header_index_view(&parent_hash, store_first)
1102 .expect("parent should be verified");
1103 let mut header_view = HeaderIndexView::new(
1104 header.hash(),
1105 header.number(),
1106 header.epoch(),
1107 header.timestamp(),
1108 parent_hash,
1109 parent_header_index.total_difficulty() + header.difficulty(),
1110 );
1111
1112 let snapshot = Arc::clone(&self.shared.snapshot());
1113 header_view.build_skip(
1114 tip_number,
1115 |hash, store_first| self.get_header_index_view(hash, store_first),
1116 |number, current| {
1117 if current.number <= snapshot.tip_number() && snapshot.is_main_chain(¤t.hash)
1119 {
1120 snapshot
1121 .get_block_hash(number)
1122 .and_then(|hash| self.get_header_index_view(&hash, true))
1123 } else {
1124 None
1125 }
1126 },
1127 );
1128 self.shared.header_map().insert(header_view.clone());
1129 self.state
1130 .peers()
1131 .may_set_best_known_header(peer, header_view.as_header_index());
1132 self.state.may_set_shared_best_header(header_view);
1133 }
1134
1135 pub(crate) fn get_header_index_view(
1136 &self,
1137 hash: &Byte32,
1138 store_first: bool,
1139 ) -> Option<HeaderIndexView> {
1140 let store = self.store();
1141 if store_first {
1142 store
1143 .get_block_header(hash)
1144 .and_then(|header| {
1145 store
1146 .get_block_ext(hash)
1147 .map(|block_ext| (header, block_ext.total_difficulty).into())
1148 })
1149 .or_else(|| self.shared.header_map().get(hash))
1150 } else {
1151 self.shared.header_map().get(hash).or_else(|| {
1152 store.get_block_header(hash).and_then(|header| {
1153 store
1154 .get_block_ext(hash)
1155 .map(|block_ext| (header, block_ext.total_difficulty).into())
1156 })
1157 })
1158 }
1159 }
1160
1161 pub fn is_stored(&self, hash: &packed::Byte32) -> bool {
1163 let status = self.active_chain().get_block_status(hash);
1164 status.contains(BlockStatus::BLOCK_STORED)
1165 }
1166
1167 pub fn get_epoch_ext(&self, hash: &Byte32) -> Option<EpochExt> {
1169 self.store().get_block_epoch(hash)
1170 }
1171
1172 pub fn insert_peer_unknown_header_list(&self, pi: PeerIndex, header_list: Vec<Byte32>) {
1174 if self.state().peers.unknown_header_list_is_empty(pi) {
1176 for hash in header_list {
1179 if let Some(header) = self.shared().header_map().get(&hash) {
1180 self.state()
1181 .peers
1182 .may_set_best_known_header(pi, header.as_header_index());
1183 break;
1184 } else {
1185 self.state().peers.insert_unknown_header_hash(pi, hash)
1186 }
1187 }
1188 }
1189 }
1190
1191 pub fn new_block_received(&self, block: &core::BlockView) -> bool {
1193 if !self
1194 .state()
1195 .write_inflight_blocks()
1196 .remove_by_block((block.number(), block.hash()).into())
1197 {
1198 return false;
1199 }
1200
1201 let status = self.active_chain().get_block_status(&block.hash());
1202 debug!(
1203 "new_block_received {}-{}, status: {:?}",
1204 block.number(),
1205 block.hash(),
1206 status
1207 );
1208 if !BlockStatus::HEADER_VALID.eq(&status) {
1209 return false;
1210 }
1211
1212 if let dashmap::mapref::entry::Entry::Vacant(status) =
1213 self.shared().block_status_map().entry(block.hash())
1214 {
1215 status.insert(BlockStatus::BLOCK_RECEIVED);
1216 return true;
1217 }
1218 false
1219 }
1220}
1221
1222impl HeaderFieldsProvider for SyncShared {
1223 fn get_header_fields(&self, hash: &Byte32) -> Option<HeaderFields> {
1224 self.shared
1225 .header_map()
1226 .get(hash)
1227 .map(|header| HeaderFields {
1228 hash: header.hash(),
1229 number: header.number(),
1230 epoch: header.epoch(),
1231 timestamp: header.timestamp(),
1232 parent_hash: header.parent_hash(),
1233 })
1234 .or_else(|| {
1235 self.store()
1236 .get_block_header(hash)
1237 .map(|header| HeaderFields {
1238 hash: header.hash(),
1239 number: header.number(),
1240 epoch: header.epoch(),
1241 timestamp: header.timestamp(),
1242 parent_hash: header.parent_hash(),
1243 })
1244 })
1245 }
1246}
1247
1248#[derive(Eq, PartialEq, Clone)]
1249pub struct UnknownTxHashPriority {
1250 request_time: Instant,
1251 peers: Vec<PeerIndex>,
1252 requested: bool,
1253}
1254
1255impl UnknownTxHashPriority {
1256 pub fn should_request(&self, now: Instant) -> bool {
1257 self.next_request_at() < now
1258 }
1259
1260 pub fn next_request_at(&self) -> Instant {
1261 if self.requested {
1262 self.request_time + RETRY_ASK_TX_TIMEOUT_INCREASE
1263 } else {
1264 self.request_time
1265 }
1266 }
1267
1268 pub fn next_request_peer(&mut self) -> Option<PeerIndex> {
1269 if self.requested {
1270 if self.peers.len() > 1 {
1271 self.request_time = Instant::now();
1272 self.peers.swap_remove(0);
1273 self.peers.first().cloned()
1274 } else {
1275 None
1276 }
1277 } else {
1278 self.requested = true;
1279 self.peers.first().cloned()
1280 }
1281 }
1282
1283 pub fn push_peer(&mut self, peer_index: PeerIndex) {
1284 self.peers.push(peer_index);
1285 }
1286
1287 pub fn requesting_peer(&self) -> Option<PeerIndex> {
1288 if self.requested {
1289 self.peers.first().cloned()
1290 } else {
1291 None
1292 }
1293 }
1294}
1295
1296impl Ord for UnknownTxHashPriority {
1297 fn cmp(&self, other: &Self) -> cmp::Ordering {
1298 self.next_request_at()
1299 .cmp(&other.next_request_at())
1300 .reverse()
1301 }
1302}
1303
1304impl PartialOrd for UnknownTxHashPriority {
1305 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1306 Some(self.cmp(other))
1307 }
1308}
1309
1310pub struct SyncState {
1311 shared_best_header: RwLock<HeaderIndexView>,
1313 tx_filter: Mutex<TtlFilter<Byte32>>,
1314
1315 unknown_tx_hashes: Mutex<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>>,
1317
1318 peers: Peers,
1320
1321 pending_get_block_proposals: DashMap<packed::ProposalShortId, HashSet<PeerIndex>>,
1323 pending_get_headers: RwLock<LruCache<(PeerIndex, Byte32), Instant>>,
1324 pending_compact_blocks: Mutex<PendingCompactBlockMap>,
1325
1326 inflight_proposals: DashMap<packed::ProposalShortId, BlockNumber>,
1328 inflight_blocks: RwLock<InflightBlocks>,
1329
1330 tx_relay_receiver: Receiver<TxVerificationResult>,
1332 min_chain_work: U256,
1333}
1334
1335impl SyncState {
1336 pub fn min_chain_work(&self) -> &U256 {
1337 &self.min_chain_work
1338 }
1339
1340 pub fn min_chain_work_ready(&self) -> bool {
1341 self.shared_best_header
1342 .read()
1343 .is_better_than(&self.min_chain_work)
1344 }
1345
1346 pub fn n_sync_started(&self) -> &AtomicUsize {
1347 &self.peers.n_sync_started
1348 }
1349
1350 pub fn peers(&self) -> &Peers {
1351 &self.peers
1352 }
1353
1354 pub fn compare_with_pending_compact(&self, hash: &Byte32, now: u64) -> bool {
1355 let pending = self.pending_compact_blocks.lock();
1356 pending.is_empty()
1358 || pending
1359 .get(hash)
1360 .map(|(_, _, time)| now > time + 2000)
1361 .unwrap_or(true)
1362 }
1363
1364 pub fn pending_compact_blocks(&self) -> MutexGuard<PendingCompactBlockMap> {
1365 self.pending_compact_blocks.lock()
1366 }
1367
1368 pub fn read_inflight_blocks(&self) -> RwLockReadGuard<InflightBlocks> {
1369 self.inflight_blocks.read()
1370 }
1371
1372 pub fn write_inflight_blocks(&self) -> RwLockWriteGuard<InflightBlocks> {
1373 self.inflight_blocks.write()
1374 }
1375
1376 pub fn take_relay_tx_verify_results(&self, limit: usize) -> Vec<TxVerificationResult> {
1377 self.tx_relay_receiver.try_iter().take(limit).collect()
1378 }
1379
1380 pub fn shared_best_header(&self) -> HeaderIndexView {
1381 self.shared_best_header.read().to_owned()
1382 }
1383
1384 pub fn shared_best_header_ref(&self) -> RwLockReadGuard<HeaderIndexView> {
1385 self.shared_best_header.read()
1386 }
1387
1388 pub fn may_set_shared_best_header(&self, header: HeaderIndexView) {
1389 let mut shared_best_header = self.shared_best_header.write();
1390 if !header.is_better_than(shared_best_header.total_difficulty()) {
1391 return;
1392 }
1393
1394 if let Some(metrics) = ckb_metrics::handle() {
1395 metrics.ckb_shared_best_number.set(header.number() as i64);
1396 }
1397 *shared_best_header = header;
1398 }
1399
1400 pub(crate) fn suspend_sync(&self, peer_state: &mut PeerState) {
1401 if peer_state.sync_started() {
1402 assert_ne!(
1403 self.peers.n_sync_started.fetch_sub(1, Ordering::AcqRel),
1404 0,
1405 "n_sync_started overflow when suspend_sync"
1406 );
1407 }
1408 peer_state.suspend_sync(SUSPEND_SYNC_TIME);
1409 }
1410
1411 pub(crate) fn tip_synced(&self, peer_state: &mut PeerState) {
1412 if peer_state.sync_started() {
1413 assert_ne!(
1414 self.peers.n_sync_started.fetch_sub(1, Ordering::AcqRel),
1415 0,
1416 "n_sync_started overflow when tip_synced"
1417 );
1418 }
1419 peer_state.tip_synced();
1420 }
1421
1422 pub fn mark_as_known_tx(&self, hash: Byte32) {
1423 self.mark_as_known_txs(iter::once(hash));
1424 }
1425
1426 pub fn remove_from_known_txs(&self, hash: &Byte32) {
1427 self.tx_filter.lock().remove(hash);
1428 }
1429
1430 pub fn mark_as_known_txs(&self, hashes: impl Iterator<Item = Byte32> + std::clone::Clone) {
1434 let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1435 let mut tx_filter = self.tx_filter.lock();
1436
1437 for hash in hashes {
1438 unknown_tx_hashes.remove(&hash);
1439 tx_filter.insert(hash);
1440 }
1441 }
1442
1443 pub fn pop_ask_for_txs(&self) -> HashMap<PeerIndex, Vec<Byte32>> {
1444 let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1445 let mut result: HashMap<PeerIndex, Vec<Byte32>> = HashMap::new();
1446 let now = Instant::now();
1447
1448 if !unknown_tx_hashes
1449 .peek()
1450 .map(|(_tx_hash, priority)| priority.should_request(now))
1451 .unwrap_or_default()
1452 {
1453 return result;
1454 }
1455
1456 while let Some((tx_hash, mut priority)) = unknown_tx_hashes.pop() {
1457 if priority.should_request(now) {
1458 if let Some(peer_index) = priority.next_request_peer() {
1459 result
1460 .entry(peer_index)
1461 .and_modify(|hashes| hashes.push(tx_hash.clone()))
1462 .or_insert_with(|| vec![tx_hash.clone()]);
1463 unknown_tx_hashes.push(tx_hash, priority);
1464 }
1465 } else {
1466 unknown_tx_hashes.push(tx_hash, priority);
1467 break;
1468 }
1469 }
1470 result
1471 }
1472
1473 pub fn add_ask_for_txs(&self, peer_index: PeerIndex, tx_hashes: Vec<Byte32>) -> Status {
1474 let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1475
1476 for tx_hash in tx_hashes
1477 .into_iter()
1478 .take(MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER)
1479 {
1480 match unknown_tx_hashes.entry(tx_hash) {
1481 keyed_priority_queue::Entry::Occupied(entry) => {
1482 let mut priority = entry.get_priority().clone();
1483 priority.push_peer(peer_index);
1484 entry.set_priority(priority);
1485 }
1486 keyed_priority_queue::Entry::Vacant(entry) => {
1487 entry.set_priority(UnknownTxHashPriority {
1488 request_time: Instant::now(),
1489 peers: vec![peer_index],
1490 requested: false,
1491 })
1492 }
1493 }
1494 }
1495
1496 if unknown_tx_hashes.len() >= MAX_UNKNOWN_TX_HASHES_SIZE
1498 || unknown_tx_hashes.len()
1499 >= self.peers.state.len() * MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER
1500 {
1501 warn!(
1502 "unknown_tx_hashes is too long, len: {}",
1503 unknown_tx_hashes.len()
1504 );
1505
1506 let mut peer_unknown_counter = 0;
1507 for (_hash, priority) in unknown_tx_hashes.iter() {
1508 for peer in priority.peers.iter() {
1509 if *peer == peer_index {
1510 peer_unknown_counter += 1;
1511 }
1512 }
1513 }
1514 if peer_unknown_counter >= MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER {
1515 return StatusCode::TooManyUnknownTransactions.into();
1516 }
1517
1518 return Status::ignored();
1519 }
1520
1521 Status::ok()
1522 }
1523
1524 pub fn already_known_tx(&self, hash: &Byte32) -> bool {
1525 self.tx_filter.lock().contains(hash)
1526 }
1527
1528 pub fn tx_filter(&self) -> MutexGuard<TtlFilter<Byte32>> {
1529 self.tx_filter.lock()
1530 }
1531
1532 pub fn unknown_tx_hashes(
1533 &self,
1534 ) -> MutexGuard<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>> {
1535 self.unknown_tx_hashes.lock()
1536 }
1537
1538 pub fn insert_inflight_proposals(
1539 &self,
1540 ids: Vec<packed::ProposalShortId>,
1541 block_number: BlockNumber,
1542 ) -> Vec<bool> {
1543 ids.into_iter()
1544 .map(|id| match self.inflight_proposals.entry(id) {
1545 dashmap::mapref::entry::Entry::Occupied(mut occupied) => {
1546 if *occupied.get() < block_number {
1547 occupied.insert(block_number);
1548 true
1549 } else {
1550 false
1551 }
1552 }
1553 dashmap::mapref::entry::Entry::Vacant(vacant) => {
1554 vacant.insert(block_number);
1555 true
1556 }
1557 })
1558 .collect()
1559 }
1560
1561 pub fn remove_inflight_proposals(&self, ids: &[packed::ProposalShortId]) -> Vec<bool> {
1562 ids.iter()
1563 .map(|id| self.inflight_proposals.remove(id).is_some())
1564 .collect()
1565 }
1566
1567 pub fn clear_expired_inflight_proposals(&self, keep_min_block_number: BlockNumber) {
1568 self.inflight_proposals
1569 .retain(|_, block_number| *block_number >= keep_min_block_number);
1570 }
1571
1572 pub fn contains_inflight_proposal(&self, proposal_id: &packed::ProposalShortId) -> bool {
1573 self.inflight_proposals.contains_key(proposal_id)
1574 }
1575
1576 pub fn drain_get_block_proposals(
1577 &self,
1578 ) -> DashMap<packed::ProposalShortId, HashSet<PeerIndex>> {
1579 let ret = self.pending_get_block_proposals.clone();
1580 self.pending_get_block_proposals.clear();
1581 ret
1582 }
1583
1584 pub fn insert_get_block_proposals(&self, pi: PeerIndex, ids: Vec<packed::ProposalShortId>) {
1585 for id in ids.into_iter() {
1586 self.pending_get_block_proposals
1587 .entry(id)
1588 .or_default()
1589 .insert(pi);
1590 }
1591 }
1592
1593 pub fn disconnected(&self, pi: PeerIndex) {
1598 let removed_inflight_blocks_count = self.write_inflight_blocks().remove_by_peer(pi);
1599 if removed_inflight_blocks_count > 0 {
1600 debug!(
1601 "disconnected {}, remove {} inflight blocks",
1602 pi, removed_inflight_blocks_count
1603 )
1604 }
1605 self.peers().disconnected(pi);
1606 }
1607}
1608
1609#[derive(Clone)]
1611pub struct ActiveChain {
1612 sync_shared: SyncShared,
1613 snapshot: Arc<Snapshot>,
1614}
1615
1616#[doc(hidden)]
1617impl ActiveChain {
1618 pub(crate) fn sync_shared(&self) -> &SyncShared {
1619 &self.sync_shared
1620 }
1621
1622 pub fn shared(&self) -> &Shared {
1623 self.sync_shared.shared()
1624 }
1625
1626 fn store(&self) -> &ChainDB {
1627 self.sync_shared.store()
1628 }
1629
1630 pub fn state(&self) -> &SyncState {
1631 self.sync_shared.state()
1632 }
1633
1634 fn snapshot(&self) -> &Snapshot {
1635 &self.snapshot
1636 }
1637
1638 pub fn get_block_hash(&self, number: BlockNumber) -> Option<packed::Byte32> {
1639 self.snapshot().get_block_hash(number)
1640 }
1641
1642 pub fn get_block(&self, h: &packed::Byte32) -> Option<core::BlockView> {
1643 self.store().get_block(h)
1644 }
1645
1646 pub fn get_block_header(&self, h: &packed::Byte32) -> Option<core::HeaderView> {
1647 self.store().get_block_header(h)
1648 }
1649
1650 pub fn get_block_ext(&self, h: &packed::Byte32) -> Option<core::BlockExt> {
1651 self.snapshot().get_block_ext(h)
1652 }
1653
1654 pub fn get_block_filter(&self, hash: &packed::Byte32) -> Option<packed::Bytes> {
1655 self.store().get_block_filter(hash)
1656 }
1657
1658 pub fn get_block_filter_hash(&self, hash: &packed::Byte32) -> Option<packed::Byte32> {
1659 self.store().get_block_filter_hash(hash)
1660 }
1661
1662 pub fn get_latest_built_filter_block_number(&self) -> BlockNumber {
1663 self.snapshot
1664 .get_latest_built_filter_data_block_hash()
1665 .and_then(|hash| self.snapshot.get_block_number(&hash))
1666 .unwrap_or_default()
1667 }
1668
1669 pub fn total_difficulty(&self) -> &U256 {
1670 self.snapshot.total_difficulty()
1671 }
1672
1673 pub fn tip_header(&self) -> core::HeaderView {
1674 self.snapshot.tip_header().clone()
1675 }
1676
1677 pub fn tip_hash(&self) -> Byte32 {
1678 self.snapshot.tip_hash()
1679 }
1680
1681 pub fn tip_number(&self) -> BlockNumber {
1682 self.snapshot.tip_number()
1683 }
1684
1685 pub fn epoch_ext(&self) -> core::EpochExt {
1686 self.snapshot.epoch_ext().clone()
1687 }
1688
1689 pub fn is_main_chain(&self, hash: &packed::Byte32) -> bool {
1690 self.snapshot.is_main_chain(hash)
1691 }
1692 pub fn is_unverified_chain(&self, hash: &packed::Byte32) -> bool {
1693 self.store().get_block_epoch_index(hash).is_some()
1694 }
1695
1696 pub fn is_initial_block_download(&self) -> bool {
1697 self.shared().is_initial_block_download()
1698 }
1699 pub fn unverified_tip_header(&self) -> HeaderIndex {
1700 self.shared().get_unverified_tip()
1701 }
1702
1703 pub fn unverified_tip_hash(&self) -> Byte32 {
1704 self.unverified_tip_header().hash()
1705 }
1706
1707 pub fn unverified_tip_number(&self) -> BlockNumber {
1708 self.unverified_tip_header().number()
1709 }
1710
1711 pub fn get_ancestor(&self, base: &Byte32, number: BlockNumber) -> Option<HeaderIndexView> {
1712 self.get_ancestor_internal(base, number, false)
1713 }
1714
1715 pub fn get_ancestor_with_unverified(
1716 &self,
1717 base: &Byte32,
1718 number: BlockNumber,
1719 ) -> Option<HeaderIndexView> {
1720 self.get_ancestor_internal(base, number, true)
1721 }
1722
1723 fn get_ancestor_internal(
1724 &self,
1725 base: &Byte32,
1726 number: BlockNumber,
1727 with_unverified: bool,
1728 ) -> Option<HeaderIndexView> {
1729 let tip_number = {
1730 if with_unverified {
1731 self.unverified_tip_number()
1732 } else {
1733 self.tip_number()
1734 }
1735 };
1736
1737 let block_is_on_chain_fn = |hash: &Byte32| {
1738 if with_unverified {
1739 self.is_unverified_chain(hash)
1740 } else {
1741 self.is_main_chain(hash)
1742 }
1743 };
1744
1745 let get_header_view_fn = |hash: &Byte32, store_first: bool| {
1746 self.sync_shared.get_header_index_view(hash, store_first)
1747 };
1748
1749 let fast_scanner_fn = |number: BlockNumber, current: BlockNumberAndHash| {
1750 if current.number <= tip_number && block_is_on_chain_fn(¤t.hash) {
1752 self.get_block_hash(number)
1753 .and_then(|hash| self.sync_shared.get_header_index_view(&hash, true))
1754 } else {
1755 None
1756 }
1757 };
1758
1759 self.sync_shared
1760 .get_header_index_view(base, false)?
1761 .get_ancestor(tip_number, number, get_header_view_fn, fast_scanner_fn)
1762 }
1763
1764 pub fn get_locator(&self, start: BlockNumberAndHash) -> Vec<Byte32> {
1765 let mut step = 1;
1766 let mut locator = Vec::with_capacity(32);
1767 let mut index = start.number();
1768 let mut base = start.hash();
1769
1770 loop {
1771 let header_hash = self
1772 .get_ancestor(&base, index)
1773 .unwrap_or_else(|| {
1774 panic!(
1775 "index calculated in get_locator: \
1776 start: {:?}, base: {}, step: {}, locators({}): {:?}.",
1777 start,
1778 base,
1779 step,
1780 locator.len(),
1781 locator,
1782 )
1783 })
1784 .hash();
1785 locator.push(header_hash.clone());
1786
1787 if locator.len() >= 10 {
1788 step <<= 1;
1789 }
1790
1791 if index < step * 2 {
1792 if locator.len() < 52 && index > ONE_DAY_BLOCK_NUMBER {
1801 index >>= 1;
1802 base = header_hash;
1803 continue;
1804 }
1805 if index != 0 {
1807 locator.push(self.sync_shared.consensus().genesis_hash());
1808 }
1809 break;
1810 }
1811 index -= step;
1812 base = header_hash;
1813 }
1814 locator
1815 }
1816
1817 pub fn last_common_ancestor(
1818 &self,
1819 pa: &BlockNumberAndHash,
1820 pb: &BlockNumberAndHash,
1821 ) -> Option<BlockNumberAndHash> {
1822 let (mut m_left, mut m_right) = if pa.number() > pb.number() {
1823 (pb.clone(), pa.clone())
1824 } else {
1825 (pa.clone(), pb.clone())
1826 };
1827
1828 m_right = self
1829 .get_ancestor(&m_right.hash(), m_left.number())?
1830 .number_and_hash();
1831 if m_left == m_right {
1832 return Some(m_left);
1833 }
1834 debug_assert!(m_left.number() == m_right.number());
1835
1836 while m_left != m_right {
1837 m_left = self
1838 .get_ancestor(&m_left.hash(), m_left.number() - 1)?
1839 .number_and_hash();
1840 m_right = self
1841 .get_ancestor(&m_right.hash(), m_right.number() - 1)?
1842 .number_and_hash();
1843 }
1844 Some(m_left)
1845 }
1846
1847 pub fn locate_latest_common_block(
1848 &self,
1849 _hash_stop: &Byte32,
1850 locator: &[Byte32],
1851 ) -> Option<BlockNumber> {
1852 if locator.is_empty() {
1853 return None;
1854 }
1855
1856 let locator_hash = locator.last().expect("empty checked");
1857 if locator_hash != &self.sync_shared.consensus().genesis_hash() {
1858 return None;
1859 }
1860
1861 let (index, latest_common) = locator
1863 .iter()
1864 .enumerate()
1865 .map(|(index, hash)| (index, self.snapshot.get_block_number(hash)))
1866 .find(|(_index, number)| number.is_some())
1867 .expect("locator last checked");
1868
1869 if index == 0 || latest_common == Some(0) {
1870 return latest_common;
1871 }
1872
1873 if let Some(header) = locator
1874 .get(index - 1)
1875 .and_then(|hash| self.sync_shared.store().get_block_header(hash))
1876 {
1877 let mut block_hash = header.data().raw().parent_hash();
1878 loop {
1879 let block_header = match self.sync_shared.store().get_block_header(&block_hash) {
1880 None => break latest_common,
1881 Some(block_header) => block_header,
1882 };
1883
1884 if let Some(block_number) = self.snapshot.get_block_number(&block_hash) {
1885 return Some(block_number);
1886 }
1887
1888 block_hash = block_header.data().raw().parent_hash();
1889 }
1890 } else {
1891 latest_common
1892 }
1893 }
1894
1895 pub fn get_locator_response(
1896 &self,
1897 block_number: BlockNumber,
1898 hash_stop: &Byte32,
1899 ) -> Vec<core::HeaderView> {
1900 let tip_number = self.tip_header().number();
1901 let max_height = cmp::min(
1902 block_number + 1 + MAX_HEADERS_LEN as BlockNumber,
1903 tip_number + 1,
1904 );
1905 (block_number + 1..max_height)
1906 .filter_map(|block_number| self.snapshot.get_block_hash(block_number))
1907 .take_while(|block_hash| block_hash != hash_stop)
1908 .filter_map(|block_hash| self.sync_shared.store().get_block_header(&block_hash))
1909 .collect()
1910 }
1911
1912 pub fn send_getheaders_to_peer(
1913 &self,
1914 nc: &dyn CKBProtocolContext,
1915 peer: PeerIndex,
1916 block_number_and_hash: BlockNumberAndHash,
1917 ) {
1918 if let Some(last_time) = self
1919 .state()
1920 .pending_get_headers
1921 .write()
1922 .get(&(peer, block_number_and_hash.hash()))
1923 {
1924 if Instant::now() < *last_time + GET_HEADERS_TIMEOUT {
1925 debug!(
1926 "Last get_headers request to peer {} is less than {:?}; Ignore it.",
1927 peer, GET_HEADERS_TIMEOUT,
1928 );
1929 return;
1930 } else {
1931 debug!(
1932 "Can not get headers from {} in {:?}, retry",
1933 peer, GET_HEADERS_TIMEOUT,
1934 );
1935 }
1936 }
1937 self.state()
1938 .pending_get_headers
1939 .write()
1940 .put((peer, block_number_and_hash.hash()), Instant::now());
1941
1942 debug!(
1943 "send_getheaders_to_peer peer={}, hash={}",
1944 peer,
1945 block_number_and_hash.hash()
1946 );
1947 let locator_hash = self.get_locator(block_number_and_hash);
1948 let content = packed::GetHeaders::new_builder()
1949 .block_locator_hashes(locator_hash)
1950 .hash_stop(packed::Byte32::zero())
1951 .build();
1952 let message = packed::SyncMessage::new_builder().set(content).build();
1953 let _status = send_message(SupportProtocols::Sync.protocol_id(), nc, peer, &message);
1954 }
1955
1956 pub fn get_block_status(&self, block_hash: &Byte32) -> BlockStatus {
1957 self.shared().get_block_status(block_hash)
1958 }
1959
1960 pub fn contains_block_status(&self, block_hash: &Byte32, status: BlockStatus) -> bool {
1961 self.get_block_status(block_hash).contains(status)
1962 }
1963}
1964
1965#[derive(Clone, Copy, Debug)]
1968pub enum IBDState {
1969 In,
1970 Out,
1971}
1972
1973impl From<bool> for IBDState {
1974 fn from(src: bool) -> Self {
1975 if src { IBDState::In } else { IBDState::Out }
1976 }
1977}
1978
1979impl From<IBDState> for bool {
1980 fn from(s: IBDState) -> bool {
1981 match s {
1982 IBDState::In => true,
1983 IBDState::Out => false,
1984 }
1985 }
1986}
1987
1988pub(crate) fn post_sync_process(
1989 nc: &dyn CKBProtocolContext,
1990 peer: PeerIndex,
1991 item_name: &str,
1992 status: Status,
1993) {
1994 if let Some(ban_time) = status.should_ban() {
1995 error!(
1996 "Receive {} from {}. Ban {:?} for {}",
1997 item_name, peer, ban_time, status
1998 );
1999 nc.ban_peer(peer, ban_time, status.to_string());
2000 } else if status.should_warn() {
2001 warn!("Receive {} from {}, {}", item_name, peer, status);
2002 } else if !status.is_ok() {
2003 debug!("Receive {} from {}, {}", item_name, peer, status);
2004 }
2005}