1use crate::{
2 FAST_INDEX, LOW_INDEX, NORMAL_INDEX, Status, StatusCode, TIME_TRACE_SIZE,
3 utils::async_send_message,
4};
5use ckb_app_config::SyncConfig;
6#[cfg(test)]
7use ckb_chain::VerifyResult;
8use ckb_chain::{ChainController, RemoteBlock};
9use ckb_chain_spec::consensus::{Consensus, MAX_BLOCK_INTERVAL, MIN_BLOCK_INTERVAL};
10use ckb_channel::Receiver;
11use ckb_constant::sync::{
12 BLOCK_DOWNLOAD_TIMEOUT, HEADERS_DOWNLOAD_HEADERS_PER_SECOND, HEADERS_DOWNLOAD_INSPECT_WINDOW,
13 HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE, INIT_BLOCKS_IN_TRANSIT_PER_PEER,
14 MAX_BLOCKS_IN_TRANSIT_PER_PEER, MAX_HEADERS_LEN, MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
15 MAX_UNKNOWN_TX_HASHES_SIZE, MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER, POW_INTERVAL,
16 RETRY_ASK_TX_TIMEOUT_INCREASE, SUSPEND_SYNC_TIME,
17};
18use ckb_logger::{debug, error, info, trace, warn};
19use ckb_network::{CKBProtocolContext, PeerIndex, SupportProtocols};
20use ckb_shared::{
21 Snapshot,
22 block_status::BlockStatus,
23 shared::Shared,
24 types::{HeaderIndex, HeaderIndexView, SHRINK_THRESHOLD},
25};
26use ckb_store::{ChainDB, ChainStore};
27use ckb_systemtime::unix_time_as_millis;
28use ckb_traits::{HeaderFields, HeaderFieldsProvider};
29use ckb_tx_pool::service::TxVerificationResult;
30use ckb_types::BlockNumberAndHash;
31use ckb_types::{
32 U256,
33 core::{self, BlockNumber, EpochExt},
34 packed::{self, Byte32},
35 prelude::*,
36};
37use ckb_util::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard, shrink_to_fit};
38use dashmap::{self, DashMap};
39use keyed_priority_queue::{self, KeyedPriorityQueue};
40use lru::LruCache;
41use std::collections::{BTreeMap, HashMap, HashSet, btree_map::Entry};
42use std::hash::Hash;
43use std::sync::Arc;
44use std::sync::atomic::{AtomicUsize, Ordering};
45use std::time::{Duration, Instant};
46use std::{cmp, fmt, iter};
47
48const GET_HEADERS_CACHE_SIZE: usize = 10000;
49const GET_HEADERS_TIMEOUT: Duration = Duration::from_secs(15);
51const FILTER_SIZE: usize = 50000;
52const ONE_DAY_BLOCK_NUMBER: u64 = 8192;
54pub(crate) const FILTER_TTL: u64 = 4 * 60 * 60;
55
56#[derive(Clone, Debug, Default)]
71pub struct ChainSyncState {
72 pub timeout: u64,
73 pub work_header: Option<core::HeaderView>,
74 pub total_difficulty: Option<U256>,
75 pub sent_getheaders: bool,
76 headers_sync_state: HeadersSyncState,
77}
78
79impl ChainSyncState {
80 fn can_start_sync(&self, now: u64) -> bool {
81 match self.headers_sync_state {
82 HeadersSyncState::Initialized => false,
83 HeadersSyncState::SyncProtocolConnected => true,
84 HeadersSyncState::Started => false,
85 HeadersSyncState::Suspend(until) | HeadersSyncState::TipSynced(until) => until < now,
86 }
87 }
88
89 fn connected(&mut self) {
90 self.headers_sync_state = HeadersSyncState::SyncProtocolConnected;
91 }
92
93 fn start(&mut self) {
94 self.headers_sync_state = HeadersSyncState::Started
95 }
96
97 fn suspend(&mut self, until: u64) {
98 self.headers_sync_state = HeadersSyncState::Suspend(until)
99 }
100
101 fn tip_synced(&mut self) {
102 let now = unix_time_as_millis();
103 let avg_interval = (MAX_BLOCK_INTERVAL + MIN_BLOCK_INTERVAL) / 2;
104 self.headers_sync_state = HeadersSyncState::TipSynced(now + avg_interval * 1000);
105 }
106
107 fn started(&self) -> bool {
108 matches!(self.headers_sync_state, HeadersSyncState::Started)
109 }
110
111 fn started_or_tip_synced(&self) -> bool {
112 matches!(
113 self.headers_sync_state,
114 HeadersSyncState::Started | HeadersSyncState::TipSynced(_)
115 )
116 }
117}
118
119#[derive(Default, Clone, Debug)]
120enum HeadersSyncState {
121 #[default]
122 Initialized,
123 SyncProtocolConnected,
124 Started,
125 Suspend(u64), TipSynced(u64), }
128
129#[derive(Clone, Default, Debug, Copy)]
130pub struct PeerFlags {
131 pub is_outbound: bool,
132 pub is_protect: bool,
133 pub is_whitelist: bool,
134 pub is_2023edition: bool,
135}
136
137#[derive(Clone, Default, Debug, Copy)]
138pub struct HeadersSyncController {
139 pub(crate) started_ts: u64,
141 pub(crate) started_tip_ts: u64,
143
144 pub(crate) last_updated_ts: u64,
146 pub(crate) last_updated_tip_ts: u64,
148
149 pub(crate) is_close_to_the_end: bool,
150}
151
152impl HeadersSyncController {
153 #[cfg(test)]
154 pub(crate) fn new(
155 started_ts: u64,
156 started_tip_ts: u64,
157 last_updated_ts: u64,
158 last_updated_tip_ts: u64,
159 is_close_to_the_end: bool,
160 ) -> Self {
161 Self {
162 started_ts,
163 started_tip_ts,
164 last_updated_ts,
165 last_updated_tip_ts,
166 is_close_to_the_end,
167 }
168 }
169
170 pub(crate) fn from_header(better_tip_header: &HeaderIndexView) -> Self {
171 let started_ts = unix_time_as_millis();
172 let started_tip_ts = better_tip_header.timestamp();
173 Self {
174 started_ts,
175 started_tip_ts,
176 last_updated_ts: started_ts,
177 last_updated_tip_ts: started_tip_ts,
178 is_close_to_the_end: false,
179 }
180 }
181
182 #[allow(clippy::wrong_self_convention)]
185 pub(crate) fn is_timeout(&mut self, now_tip_ts: u64, now: u64) -> Option<bool> {
186 let inspect_window = HEADERS_DOWNLOAD_INSPECT_WINDOW;
187 let expected_headers_per_sec = HEADERS_DOWNLOAD_HEADERS_PER_SECOND;
188 let tolerable_bias = HEADERS_DOWNLOAD_TOLERABLE_BIAS_FOR_SINGLE_SAMPLE;
189
190 let expected_before_finished = now.saturating_sub(now_tip_ts);
191
192 trace!("headers-sync: better tip ts {}; now {}", now_tip_ts, now);
193
194 if self.is_close_to_the_end {
195 let expected_in_base_time =
196 expected_headers_per_sec * inspect_window * POW_INTERVAL / 1000;
197 if expected_before_finished > expected_in_base_time {
198 self.started_ts = now;
199 self.started_tip_ts = now_tip_ts;
200 self.last_updated_ts = now;
201 self.last_updated_tip_ts = now_tip_ts;
202 self.is_close_to_the_end = false;
203 trace!(
205 "headers-sync: send GetHeaders again since we are significantly behind the tip"
206 );
207 None
208 } else {
209 Some(false)
212 }
213 } else if expected_before_finished < inspect_window {
214 self.is_close_to_the_end = true;
215 trace!("headers-sync: ignore timeout because the tip almost reaches the real time");
216 Some(false)
217 } else {
218 let spent_since_last_updated = now.saturating_sub(self.last_updated_ts);
219
220 if spent_since_last_updated < inspect_window {
221 Some(false)
223 } else {
224 let synced_since_last_updated = now_tip_ts.saturating_sub(self.last_updated_tip_ts);
225 let expected_since_last_updated =
226 expected_headers_per_sec * spent_since_last_updated * POW_INTERVAL / 1000;
227
228 if synced_since_last_updated < expected_since_last_updated / tolerable_bias {
229 trace!("headers-sync: the instantaneous speed is too slow");
231 Some(true)
232 } else {
233 self.last_updated_ts = now;
234 self.last_updated_tip_ts = now_tip_ts;
235
236 if synced_since_last_updated > expected_since_last_updated {
237 trace!("headers-sync: the instantaneous speed is acceptable");
238 Some(false)
239 } else {
240 let spent_since_started = now.saturating_sub(self.started_ts);
242 let synced_since_started = now_tip_ts.saturating_sub(self.started_tip_ts);
243
244 let expected_since_started =
245 expected_headers_per_sec * spent_since_started * POW_INTERVAL / 1000;
246
247 if synced_since_started < expected_since_started {
248 trace!(
250 "headers-sync: both the global average speed and the instantaneous speed \
251 are slower than expected"
252 );
253 Some(true)
254 } else {
255 trace!("headers-sync: the global average speed is acceptable");
256 Some(false)
257 }
258 }
259 }
260 }
261 }
262 }
263}
264
265#[derive(Clone, Default, Debug)]
266pub struct PeerState {
267 pub headers_sync_controller: Option<HeadersSyncController>,
268 pub peer_flags: PeerFlags,
269 pub chain_sync: ChainSyncState,
270 pub best_known_header: Option<HeaderIndex>,
272 pub last_common_header: Option<BlockNumberAndHash>,
274 pub unknown_header_list: Vec<Byte32>,
277}
278
279impl PeerState {
280 pub fn new(peer_flags: PeerFlags) -> PeerState {
281 PeerState {
282 headers_sync_controller: None,
283 peer_flags,
284 chain_sync: ChainSyncState::default(),
285 best_known_header: None,
286 last_common_header: None,
287 unknown_header_list: Vec::new(),
288 }
289 }
290
291 pub fn can_start_sync(&self, now: u64, ibd: bool) -> bool {
292 ((self.peer_flags.is_protect || self.peer_flags.is_whitelist) || !ibd)
294 && self.chain_sync.can_start_sync(now)
295 }
296
297 pub fn start_sync(&mut self, headers_sync_controller: HeadersSyncController) {
298 self.chain_sync.start();
299 self.headers_sync_controller = Some(headers_sync_controller);
300 }
301
302 fn suspend_sync(&mut self, suspend_time: u64) {
303 let now = unix_time_as_millis();
304 self.chain_sync.suspend(now + suspend_time);
305 self.headers_sync_controller = None;
306 }
307
308 fn tip_synced(&mut self) {
309 self.chain_sync.tip_synced();
310 self.headers_sync_controller = None;
311 }
312
313 pub(crate) fn sync_started(&self) -> bool {
314 self.chain_sync.started()
315 }
316
317 pub(crate) fn started_or_tip_synced(&self) -> bool {
318 self.chain_sync.started_or_tip_synced()
319 }
320
321 pub(crate) fn sync_connected(&mut self) {
322 self.chain_sync.connected()
323 }
324}
325
326pub struct TtlFilter<T> {
327 inner: LruCache<T, u64>,
328 ttl: u64,
329}
330
331impl<T: Eq + Hash + Clone> Default for TtlFilter<T> {
332 fn default() -> Self {
333 TtlFilter::new(FILTER_SIZE, FILTER_TTL)
334 }
335}
336
337impl<T: Eq + Hash + Clone> TtlFilter<T> {
338 pub fn new(size: usize, ttl: u64) -> Self {
339 Self {
340 inner: LruCache::new(size),
341 ttl,
342 }
343 }
344
345 pub fn contains(&self, item: &T) -> bool {
346 self.inner.contains(item)
347 }
348
349 pub fn insert(&mut self, item: T) -> bool {
350 let now = ckb_systemtime::unix_time().as_secs();
351 self.inner.put(item, now).is_none()
352 }
353
354 pub fn remove(&mut self, item: &T) -> bool {
355 self.inner.pop(item).is_some()
356 }
357
358 pub fn remove_expired(&mut self) {
360 let now = ckb_systemtime::unix_time().as_secs();
361 let expired_keys: Vec<T> = self
362 .inner
363 .iter()
364 .filter_map(|(key, time)| {
365 if *time + self.ttl < now {
366 Some(key)
367 } else {
368 None
369 }
370 })
371 .cloned()
372 .collect();
373
374 for k in expired_keys {
375 self.remove(&k);
376 }
377 }
378}
379
380#[derive(Default)]
381pub struct Peers {
382 pub state: DashMap<PeerIndex, PeerState>,
383 pub n_sync_started: AtomicUsize,
384 pub n_protected_outbound_peers: AtomicUsize,
385}
386
387#[derive(Debug, Clone)]
388pub struct InflightState {
389 pub(crate) peer: PeerIndex,
390 pub(crate) timestamp: u64,
391}
392
393impl InflightState {
394 fn new(peer: PeerIndex) -> Self {
395 Self {
396 peer,
397 timestamp: unix_time_as_millis(),
398 }
399 }
400}
401
402enum TimeQuantile {
403 MinToFast,
404 FastToNormal,
405 NormalToUpper,
406 UpperToMax,
407}
408
409#[derive(Clone)]
425struct TimeAnalyzer {
426 trace: [u64; TIME_TRACE_SIZE],
427 index: usize,
428 fast_time: u64,
429 normal_time: u64,
430 low_time: u64,
431}
432
433impl fmt::Debug for TimeAnalyzer {
434 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
435 fmt.debug_struct("TimeAnalyzer")
436 .field("fast_time", &self.fast_time)
437 .field("normal_time", &self.normal_time)
438 .field("low_time", &self.low_time)
439 .finish()
440 }
441}
442
443impl Default for TimeAnalyzer {
444 fn default() -> Self {
445 Self {
447 trace: [0; TIME_TRACE_SIZE],
448 index: 0,
449 fast_time: 1000,
450 normal_time: 1250,
451 low_time: 1500,
452 }
453 }
454}
455
456impl TimeAnalyzer {
457 fn push_time(&mut self, time: u64) -> TimeQuantile {
458 if self.index < TIME_TRACE_SIZE {
459 self.trace[self.index] = time;
460 self.index += 1;
461 } else {
462 self.trace.sort_unstable();
463 self.fast_time = (self.fast_time.saturating_add(self.trace[FAST_INDEX])) >> 1;
464 self.normal_time = (self.normal_time.saturating_add(self.trace[NORMAL_INDEX])) >> 1;
465 self.low_time = (self.low_time.saturating_add(self.trace[LOW_INDEX])) >> 1;
466 self.trace[0] = time;
467 self.index = 1;
468 }
469
470 if time <= self.fast_time {
471 TimeQuantile::MinToFast
472 } else if time <= self.normal_time {
473 TimeQuantile::FastToNormal
474 } else if time > self.low_time {
475 TimeQuantile::UpperToMax
476 } else {
477 TimeQuantile::NormalToUpper
478 }
479 }
480}
481
482#[derive(Debug, Clone)]
483pub struct DownloadScheduler {
484 task_count: usize,
485 timeout_count: usize,
486 hashes: HashSet<BlockNumberAndHash>,
487}
488
489impl Default for DownloadScheduler {
490 fn default() -> Self {
491 Self {
492 hashes: HashSet::default(),
493 task_count: INIT_BLOCKS_IN_TRANSIT_PER_PEER,
494 timeout_count: 0,
495 }
496 }
497}
498
499impl DownloadScheduler {
500 fn inflight_count(&self) -> usize {
501 self.hashes.len()
502 }
503
504 fn can_fetch(&self) -> usize {
505 self.task_count.saturating_sub(self.hashes.len())
506 }
507
508 pub(crate) const fn task_count(&self) -> usize {
509 self.task_count
510 }
511
512 fn increase(&mut self, num: usize) {
513 if self.task_count < MAX_BLOCKS_IN_TRANSIT_PER_PEER {
514 self.task_count = ::std::cmp::min(
515 self.task_count.saturating_add(num),
516 MAX_BLOCKS_IN_TRANSIT_PER_PEER,
517 )
518 }
519 }
520
521 fn decrease(&mut self, num: usize) {
522 self.timeout_count = self.task_count.saturating_add(num);
523 if self.timeout_count > 2 {
524 self.task_count = self.task_count.saturating_sub(1);
525 self.timeout_count = 0;
526 }
527 }
528
529 fn punish(&mut self, exp: usize) {
530 self.task_count >>= exp
531 }
532}
533
534#[derive(Clone)]
535pub struct InflightBlocks {
536 pub(crate) download_schedulers: HashMap<PeerIndex, DownloadScheduler>,
537 inflight_states: BTreeMap<BlockNumberAndHash, InflightState>,
538 pub(crate) trace_number: HashMap<BlockNumberAndHash, u64>,
539 pub(crate) restart_number: BlockNumber,
540 time_analyzer: TimeAnalyzer,
541 pub(crate) adjustment: bool,
542 pub(crate) protect_num: usize,
543}
544
545impl Default for InflightBlocks {
546 fn default() -> Self {
547 InflightBlocks {
548 download_schedulers: HashMap::default(),
549 inflight_states: BTreeMap::default(),
550 trace_number: HashMap::default(),
551 restart_number: 0,
552 time_analyzer: TimeAnalyzer::default(),
553 adjustment: true,
554 protect_num: MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT,
555 }
556 }
557}
558
559struct DebugHashSet<'a>(&'a HashSet<BlockNumberAndHash>);
560
561impl<'a> fmt::Debug for DebugHashSet<'a> {
562 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
563 fmt.debug_set()
564 .entries(self.0.iter().map(|h| format!("{}, {}", h.number, h.hash)))
565 .finish()
566 }
567}
568
569impl fmt::Debug for InflightBlocks {
570 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
571 fmt.debug_map()
572 .entries(
573 self.download_schedulers
574 .iter()
575 .map(|(k, v)| (k, DebugHashSet(&v.hashes))),
576 )
577 .finish()?;
578 fmt.debug_map()
579 .entries(
580 self.inflight_states
581 .iter()
582 .map(|(k, v)| (format!("{}, {}", k.number, k.hash), v)),
583 )
584 .finish()?;
585 self.time_analyzer.fmt(fmt)
586 }
587}
588
589impl InflightBlocks {
590 pub fn blocks_iter(&self) -> impl Iterator<Item = (&PeerIndex, &HashSet<BlockNumberAndHash>)> {
591 self.download_schedulers.iter().map(|(k, v)| (k, &v.hashes))
592 }
593
594 pub fn total_inflight_count(&self) -> usize {
595 self.inflight_states.len()
596 }
597
598 pub fn division_point(&self) -> (u64, u64, u64) {
599 (
600 self.time_analyzer.fast_time,
601 self.time_analyzer.normal_time,
602 self.time_analyzer.low_time,
603 )
604 }
605
606 pub fn peer_inflight_count(&self, peer: PeerIndex) -> usize {
607 self.download_schedulers
608 .get(&peer)
609 .map(DownloadScheduler::inflight_count)
610 .unwrap_or(0)
611 }
612
613 pub fn peer_can_fetch_count(&self, peer: PeerIndex) -> usize {
614 self.download_schedulers.get(&peer).map_or(
615 INIT_BLOCKS_IN_TRANSIT_PER_PEER,
616 DownloadScheduler::can_fetch,
617 )
618 }
619
620 pub fn inflight_block_by_peer(&self, peer: PeerIndex) -> Option<&HashSet<BlockNumberAndHash>> {
621 self.download_schedulers.get(&peer).map(|d| &d.hashes)
622 }
623
624 pub fn inflight_state_by_block(&self, block: &BlockNumberAndHash) -> Option<&InflightState> {
625 self.inflight_states.get(block)
626 }
627
628 pub fn mark_slow_block(&mut self, tip: BlockNumber) {
629 let now = ckb_systemtime::unix_time_as_millis();
630 for key in self.inflight_states.keys() {
631 if key.number > tip + 1 {
632 break;
633 }
634 self.trace_number.entry(key.clone()).or_insert(now);
635 }
636 }
637
638 pub fn prune(&mut self, tip: BlockNumber) -> HashSet<PeerIndex> {
639 let now = unix_time_as_millis();
640 let mut disconnect_list = HashSet::new();
641 let should_punish = self.download_schedulers.len() > self.protect_num;
650 let adjustment = self.adjustment;
651
652 let trace = &mut self.trace_number;
653 let download_schedulers = &mut self.download_schedulers;
654 let states = &mut self.inflight_states;
655
656 let mut remove_key = Vec::new();
657 let end = tip + 20;
662 for (key, value) in states.iter() {
663 if key.number > end {
664 break;
665 }
666 if value.timestamp + BLOCK_DOWNLOAD_TIMEOUT < now {
667 if let Some(set) = download_schedulers.get_mut(&value.peer) {
668 set.hashes.remove(key);
669 if should_punish && adjustment {
670 set.punish(2);
671 }
672 };
673 if !trace.is_empty() {
674 trace.remove(key);
675 }
676 remove_key.push(key.clone());
677 debug!(
678 "prune: remove InflightState: remove {}-{} from {}",
679 key.number, key.hash, value.peer
680 );
681
682 if let Some(metrics) = ckb_metrics::handle() {
683 metrics.ckb_inflight_timeout_count.inc();
684 }
685 }
686 }
687
688 for key in remove_key {
689 states.remove(&key);
690 }
691
692 download_schedulers.retain(|k, v| {
693 if v.task_count == 0 {
695 disconnect_list.insert(*k);
696 false
697 } else {
698 true
699 }
700 });
701 shrink_to_fit!(download_schedulers, SHRINK_THRESHOLD);
702
703 if self.restart_number != 0 && tip + 1 > self.restart_number {
704 self.restart_number = 0;
705 }
706
707 let timeout_limit = self.time_analyzer.low_time;
711
712 let restart_number = &mut self.restart_number;
713 trace.retain(|key, time| {
714 if now > timeout_limit + *time {
723 if let Some(state) = states.remove(key) {
724 if let Some(d) = download_schedulers.get_mut(&state.peer) {
725 if should_punish && adjustment {
726 d.punish(1);
727 }
728 d.hashes.remove(key);
729 debug!(
730 "prune: remove download_schedulers: remove {}-{} from {}",
731 key.number, key.hash, state.peer
732 );
733 };
734 }
735
736 if key.number > *restart_number {
737 *restart_number = key.number;
738 }
739 return false;
740 }
741 true
742 });
743 shrink_to_fit!(trace, SHRINK_THRESHOLD);
744
745 disconnect_list
746 }
747
748 pub fn insert(&mut self, peer: PeerIndex, block: BlockNumberAndHash) -> bool {
749 let state = self.inflight_states.entry(block.clone());
750 match state {
751 Entry::Occupied(_entry) => return false,
752 Entry::Vacant(entry) => entry.insert(InflightState::new(peer)),
753 };
754
755 if self.restart_number >= block.number {
756 self.trace_number
759 .insert(block.clone(), unix_time_as_millis());
760 }
761
762 let download_scheduler = self.download_schedulers.entry(peer).or_default();
763 download_scheduler.hashes.insert(block)
764 }
765
766 pub fn remove_by_peer(&mut self, peer: PeerIndex) -> usize {
767 let trace = &mut self.trace_number;
768 let state = &mut self.inflight_states;
769
770 self.download_schedulers
771 .remove(&peer)
772 .map(|blocks| {
773 let blocks_count = blocks.hashes.iter().len();
774 for block in blocks.hashes {
775 state.remove(&block);
776 if !trace.is_empty() {
777 trace.remove(&block);
778 }
779 }
780 blocks_count
781 })
782 .unwrap_or_default()
783 }
784
785 pub fn remove_by_block(&mut self, block: BlockNumberAndHash) -> bool {
786 let should_punish = self.download_schedulers.len() > self.protect_num;
787 let download_schedulers = &mut self.download_schedulers;
788 let trace = &mut self.trace_number;
789 let time_analyzer = &mut self.time_analyzer;
790 let adjustment = self.adjustment;
791 self.inflight_states
792 .remove(&block)
793 .map(|state| {
794 let elapsed = unix_time_as_millis().saturating_sub(state.timestamp);
795 if let Some(set) = download_schedulers.get_mut(&state.peer) {
796 set.hashes.remove(&block);
797 if adjustment {
798 match time_analyzer.push_time(elapsed) {
799 TimeQuantile::MinToFast => set.increase(2),
800 TimeQuantile::FastToNormal => set.increase(1),
801 TimeQuantile::NormalToUpper => {
802 if should_punish {
803 set.decrease(1)
804 }
805 }
806 TimeQuantile::UpperToMax => {
807 if should_punish {
808 set.decrease(2)
809 }
810 }
811 }
812 }
813 if !trace.is_empty() {
814 trace.remove(&block);
815 }
816 };
817 })
818 .is_some()
819 }
820}
821
822impl Peers {
823 pub fn sync_connected(
824 &self,
825 peer: PeerIndex,
826 is_outbound: bool,
827 is_whitelist: bool,
828 is_2023edition: bool,
829 ) {
830 let protect_outbound = is_outbound
831 && self
832 .n_protected_outbound_peers
833 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
834 if x < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT {
835 Some(x + 1)
836 } else {
837 None
838 }
839 })
840 .is_ok();
841
842 let peer_flags = PeerFlags {
843 is_outbound,
844 is_whitelist,
845 is_2023edition,
846 is_protect: protect_outbound,
847 };
848 self.state
849 .entry(peer)
850 .and_modify(|state| {
851 state.peer_flags = peer_flags;
852 state.sync_connected();
853 })
854 .or_insert_with(|| {
855 let mut state = PeerState::new(peer_flags);
856 state.sync_connected();
857 state
858 });
859 }
860
861 pub fn relay_connected(&self, peer: PeerIndex) {
862 self.state
863 .entry(peer)
864 .or_insert_with(|| PeerState::new(PeerFlags::default()));
865 }
866
867 pub fn get_best_known_header(&self, pi: PeerIndex) -> Option<HeaderIndex> {
868 self.state
869 .get(&pi)
870 .and_then(|peer_state| peer_state.best_known_header.clone())
871 }
872
873 pub fn may_set_best_known_header(&self, peer: PeerIndex, header_index: HeaderIndex) {
874 if let Some(mut peer_state) = self.state.get_mut(&peer) {
875 if let Some(ref known) = peer_state.best_known_header {
876 if header_index.is_better_chain(known) {
877 peer_state.best_known_header = Some(header_index);
878 }
879 } else {
880 peer_state.best_known_header = Some(header_index);
881 }
882 }
883 }
884
885 pub fn get_last_common_header(&self, pi: PeerIndex) -> Option<BlockNumberAndHash> {
886 self.state
887 .get(&pi)
888 .and_then(|peer_state| peer_state.last_common_header.clone())
889 }
890
891 pub fn set_last_common_header(&self, pi: PeerIndex, header: BlockNumberAndHash) {
892 self.state
893 .entry(pi)
894 .and_modify(|peer_state| peer_state.last_common_header = Some(header));
895 }
896
897 pub fn getheaders_received(&self, _peer: PeerIndex) {
898 }
900
901 pub fn disconnected(&self, peer: PeerIndex) {
902 if let Some(peer_state) = self.state.remove(&peer).map(|(_, peer_state)| peer_state) {
903 if peer_state.sync_started() {
904 assert_ne!(
908 self.n_sync_started.fetch_sub(1, Ordering::AcqRel),
909 0,
910 "n_sync_started overflow when disconnects"
911 );
912 }
913
914 if peer_state.peer_flags.is_protect {
916 assert_ne!(
917 self.n_protected_outbound_peers
918 .fetch_sub(1, Ordering::AcqRel),
919 0,
920 "n_protected_outbound_peers overflow when disconnects"
921 );
922 }
923 }
924 }
925
926 pub fn insert_unknown_header_hash(&self, peer: PeerIndex, hash: Byte32) {
927 self.state
928 .entry(peer)
929 .and_modify(|state| state.unknown_header_list.push(hash));
930 }
931
932 pub fn unknown_header_list_is_empty(&self, peer: PeerIndex) -> bool {
933 self.state
934 .get(&peer)
935 .map(|state| state.unknown_header_list.is_empty())
936 .unwrap_or(true)
937 }
938
939 pub fn clear_unknown_list(&self) {
940 self.state.iter_mut().for_each(|mut state| {
941 if !state.unknown_header_list.is_empty() {
942 state.unknown_header_list.clear()
943 }
944 })
945 }
946
947 pub fn get_best_known_less_than_tip_and_unknown_empty(
948 &self,
949 tip: BlockNumber,
950 ) -> Vec<PeerIndex> {
951 self.state
952 .iter()
953 .filter_map(|kv_pair| {
954 let (peer_index, state) = kv_pair.pair();
955 if !state.unknown_header_list.is_empty() {
956 return None;
957 }
958 if let Some(ref header) = state.best_known_header {
959 if header.number() < tip {
960 return Some(*peer_index);
961 }
962 }
963 None
964 })
965 .collect()
966 }
967
968 pub fn take_unknown_last(&self, peer: PeerIndex) -> Option<Byte32> {
969 self.state
970 .get_mut(&peer)
971 .and_then(|mut state| state.unknown_header_list.pop())
972 }
973
974 pub fn get_flag(&self, peer: PeerIndex) -> Option<PeerFlags> {
975 self.state.get(&peer).map(|state| state.peer_flags)
976 }
977}
978
979pub(crate) type PendingCompactBlockMap = HashMap<
981 Byte32,
982 (
983 packed::CompactBlock,
984 HashMap<PeerIndex, (Vec<u32>, Vec<u32>)>,
985 u64,
986 ),
987>;
988
989#[derive(Clone)]
991pub struct SyncShared {
992 shared: Shared,
993 state: Arc<SyncState>,
994}
995
996impl SyncShared {
997 pub fn new(
999 shared: Shared,
1000 sync_config: SyncConfig,
1001 tx_relay_receiver: Receiver<TxVerificationResult>,
1002 ) -> SyncShared {
1003 let (total_difficulty, header) = {
1004 let snapshot = shared.snapshot();
1005 (
1006 snapshot.total_difficulty().to_owned(),
1007 snapshot.tip_header().to_owned(),
1008 )
1009 };
1010 let shared_best_header = RwLock::new((header, total_difficulty).into());
1011 info!(
1012 "header_map.memory_limit {}",
1013 sync_config.header_map.memory_limit
1014 );
1015
1016 let state = SyncState {
1017 shared_best_header,
1018 tx_filter: Mutex::new(TtlFilter::default()),
1019 unknown_tx_hashes: Mutex::new(KeyedPriorityQueue::new()),
1020 peers: Peers::default(),
1021 pending_get_block_proposals: DashMap::new(),
1022 pending_compact_blocks: tokio::sync::Mutex::new(HashMap::default()),
1023 inflight_proposals: DashMap::new(),
1024 inflight_blocks: RwLock::new(InflightBlocks::default()),
1025 pending_get_headers: RwLock::new(LruCache::new(GET_HEADERS_CACHE_SIZE)),
1026 tx_relay_receiver,
1027 min_chain_work: sync_config.min_chain_work,
1028 };
1029
1030 SyncShared {
1031 shared,
1032 state: Arc::new(state),
1033 }
1034 }
1035
1036 pub fn shared(&self) -> &Shared {
1038 &self.shared
1039 }
1040
1041 pub fn active_chain(&self) -> ActiveChain {
1043 ActiveChain {
1044 sync_shared: self.clone(),
1045 snapshot: Arc::clone(&self.shared.snapshot()),
1046 }
1047 }
1048
1049 pub fn store(&self) -> &ChainDB {
1051 self.shared.store()
1052 }
1053
1054 pub fn state(&self) -> &SyncState {
1056 &self.state
1057 }
1058
1059 pub fn consensus(&self) -> &Consensus {
1061 self.shared.consensus()
1062 }
1063
1064 #[cfg(test)]
1067 pub(crate) fn blocking_insert_new_block(
1068 &self,
1069 chain: &ChainController,
1070 block: Arc<core::BlockView>,
1071 ) -> VerifyResult {
1072 chain.blocking_process_block(block)
1073 }
1074
1075 pub(crate) fn accept_remote_block(&self, chain: &ChainController, remote_block: RemoteBlock) {
1076 {
1077 let entry = self
1078 .shared()
1079 .block_status_map()
1080 .entry(remote_block.block.header().hash());
1081 if let dashmap::mapref::entry::Entry::Vacant(entry) = entry {
1082 entry.insert(BlockStatus::BLOCK_RECEIVED);
1083 }
1084 }
1085
1086 chain.asynchronous_process_remote_block(remote_block)
1087 }
1088
1089 pub fn insert_valid_header(&self, peer: PeerIndex, header: &core::HeaderView) {
1095 let tip_number = self.active_chain().tip_number();
1096 let store_first = tip_number >= header.number();
1097 let parent_hash = Byte32::from_slice(header.data().raw().parent_hash().as_slice())
1100 .expect("checked slice length");
1101 let parent_header_index = self
1102 .get_header_index_view(&parent_hash, store_first)
1103 .expect("parent should be verified");
1104 let mut header_view = HeaderIndexView::new(
1105 header.hash(),
1106 header.number(),
1107 header.epoch(),
1108 header.timestamp(),
1109 parent_hash,
1110 parent_header_index.total_difficulty() + header.difficulty(),
1111 );
1112
1113 let snapshot = Arc::clone(&self.shared.snapshot());
1114 header_view.build_skip(
1115 tip_number,
1116 |hash, store_first| self.get_header_index_view(hash, store_first),
1117 |number, current| {
1118 if current.number <= snapshot.tip_number() && snapshot.is_main_chain(¤t.hash)
1120 {
1121 snapshot
1122 .get_block_hash(number)
1123 .and_then(|hash| self.get_header_index_view(&hash, true))
1124 } else {
1125 None
1126 }
1127 },
1128 );
1129 self.shared.header_map().insert(header_view.clone());
1130 self.state
1131 .peers()
1132 .may_set_best_known_header(peer, header_view.as_header_index());
1133 self.state.may_set_shared_best_header(header_view);
1134 }
1135
1136 pub(crate) fn get_header_index_view(
1137 &self,
1138 hash: &Byte32,
1139 store_first: bool,
1140 ) -> Option<HeaderIndexView> {
1141 let store = self.store();
1142 if store_first {
1143 store
1144 .get_block_header(hash)
1145 .and_then(|header| {
1146 store
1147 .get_block_ext(hash)
1148 .map(|block_ext| (header, block_ext.total_difficulty).into())
1149 })
1150 .or_else(|| self.shared.header_map().get(hash))
1151 } else {
1152 self.shared.header_map().get(hash).or_else(|| {
1153 store.get_block_header(hash).and_then(|header| {
1154 store
1155 .get_block_ext(hash)
1156 .map(|block_ext| (header, block_ext.total_difficulty).into())
1157 })
1158 })
1159 }
1160 }
1161
1162 pub fn is_stored(&self, hash: &packed::Byte32) -> bool {
1164 let status = self.active_chain().get_block_status(hash);
1165 status.contains(BlockStatus::BLOCK_STORED)
1166 }
1167
1168 pub fn get_epoch_ext(&self, hash: &Byte32) -> Option<EpochExt> {
1170 self.store().get_block_epoch(hash)
1171 }
1172
1173 pub fn insert_peer_unknown_header_list(&self, pi: PeerIndex, header_list: Vec<Byte32>) {
1175 if self.state().peers.unknown_header_list_is_empty(pi) {
1177 for hash in header_list {
1180 if let Some(header) = self.shared().header_map().get(&hash) {
1181 self.state()
1182 .peers
1183 .may_set_best_known_header(pi, header.as_header_index());
1184 break;
1185 } else {
1186 self.state().peers.insert_unknown_header_hash(pi, hash)
1187 }
1188 }
1189 }
1190 }
1191
1192 pub fn new_block_received(&self, block: &core::BlockView) -> bool {
1194 if !self
1195 .state()
1196 .write_inflight_blocks()
1197 .remove_by_block((block.number(), block.hash()).into())
1198 {
1199 return false;
1200 }
1201
1202 let status = self.active_chain().get_block_status(&block.hash());
1203 debug!(
1204 "new_block_received {}-{}, status: {:?}",
1205 block.number(),
1206 block.hash(),
1207 status
1208 );
1209 if !BlockStatus::HEADER_VALID.eq(&status) {
1210 return false;
1211 }
1212
1213 if let dashmap::mapref::entry::Entry::Vacant(status) =
1214 self.shared().block_status_map().entry(block.hash())
1215 {
1216 status.insert(BlockStatus::BLOCK_RECEIVED);
1217 return true;
1218 }
1219 false
1220 }
1221}
1222
1223impl HeaderFieldsProvider for SyncShared {
1224 fn get_header_fields(&self, hash: &Byte32) -> Option<HeaderFields> {
1225 self.shared
1226 .header_map()
1227 .get(hash)
1228 .map(|header| HeaderFields {
1229 hash: header.hash(),
1230 number: header.number(),
1231 epoch: header.epoch(),
1232 timestamp: header.timestamp(),
1233 parent_hash: header.parent_hash(),
1234 })
1235 .or_else(|| {
1236 self.store()
1237 .get_block_header(hash)
1238 .map(|header| HeaderFields {
1239 hash: header.hash(),
1240 number: header.number(),
1241 epoch: header.epoch(),
1242 timestamp: header.timestamp(),
1243 parent_hash: header.parent_hash(),
1244 })
1245 })
1246 }
1247}
1248
1249#[derive(Eq, PartialEq, Clone)]
1250pub struct UnknownTxHashPriority {
1251 request_time: Instant,
1252 peers: Vec<PeerIndex>,
1253 requested: bool,
1254}
1255
1256impl UnknownTxHashPriority {
1257 pub fn should_request(&self, now: Instant) -> bool {
1258 self.next_request_at() < now
1259 }
1260
1261 pub fn next_request_at(&self) -> Instant {
1262 if self.requested {
1263 self.request_time + RETRY_ASK_TX_TIMEOUT_INCREASE
1264 } else {
1265 self.request_time
1266 }
1267 }
1268
1269 pub fn next_request_peer(&mut self) -> Option<PeerIndex> {
1270 if self.requested {
1271 if self.peers.len() > 1 {
1272 self.request_time = Instant::now();
1273 self.peers.swap_remove(0);
1274 self.peers.first().cloned()
1275 } else {
1276 None
1277 }
1278 } else {
1279 self.requested = true;
1280 self.peers.first().cloned()
1281 }
1282 }
1283
1284 pub fn push_peer(&mut self, peer_index: PeerIndex) {
1285 self.peers.push(peer_index);
1286 }
1287
1288 pub fn requesting_peer(&self) -> Option<PeerIndex> {
1289 if self.requested {
1290 self.peers.first().cloned()
1291 } else {
1292 None
1293 }
1294 }
1295}
1296
1297impl Ord for UnknownTxHashPriority {
1298 fn cmp(&self, other: &Self) -> cmp::Ordering {
1299 self.next_request_at()
1300 .cmp(&other.next_request_at())
1301 .reverse()
1302 }
1303}
1304
1305impl PartialOrd for UnknownTxHashPriority {
1306 fn partial_cmp(&self, other: &Self) -> Option<cmp::Ordering> {
1307 Some(self.cmp(other))
1308 }
1309}
1310
1311pub struct SyncState {
1312 shared_best_header: RwLock<HeaderIndexView>,
1314 tx_filter: Mutex<TtlFilter<Byte32>>,
1315
1316 unknown_tx_hashes: Mutex<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>>,
1318
1319 peers: Peers,
1321
1322 pending_get_block_proposals: DashMap<packed::ProposalShortId, HashSet<PeerIndex>>,
1324 pending_get_headers: RwLock<LruCache<(PeerIndex, Byte32), Instant>>,
1325 pending_compact_blocks: tokio::sync::Mutex<PendingCompactBlockMap>,
1326
1327 inflight_proposals: DashMap<packed::ProposalShortId, BlockNumber>,
1329 inflight_blocks: RwLock<InflightBlocks>,
1330
1331 tx_relay_receiver: Receiver<TxVerificationResult>,
1333 min_chain_work: U256,
1334}
1335
1336impl SyncState {
1337 pub fn min_chain_work(&self) -> &U256 {
1338 &self.min_chain_work
1339 }
1340
1341 pub fn min_chain_work_ready(&self) -> bool {
1342 self.shared_best_header
1343 .read()
1344 .is_better_than(&self.min_chain_work)
1345 }
1346
1347 pub fn n_sync_started(&self) -> &AtomicUsize {
1348 &self.peers.n_sync_started
1349 }
1350
1351 pub fn peers(&self) -> &Peers {
1352 &self.peers
1353 }
1354
1355 pub fn compare_with_pending_compact(&self, hash: &Byte32, now: u64) -> bool {
1356 let pending = self.pending_compact_blocks.blocking_lock();
1357 pending.is_empty()
1359 || pending
1360 .get(hash)
1361 .map(|(_, _, time)| now > time + 2000)
1362 .unwrap_or(true)
1363 }
1364
1365 pub async fn pending_compact_blocks(
1366 &self,
1367 ) -> tokio::sync::MutexGuard<'_, PendingCompactBlockMap> {
1368 self.pending_compact_blocks.lock().await
1369 }
1370
1371 pub fn read_inflight_blocks(&self) -> RwLockReadGuard<InflightBlocks> {
1372 self.inflight_blocks.read()
1373 }
1374
1375 pub fn write_inflight_blocks(&self) -> RwLockWriteGuard<InflightBlocks> {
1376 self.inflight_blocks.write()
1377 }
1378
1379 pub fn take_relay_tx_verify_results(&self, limit: usize) -> Vec<TxVerificationResult> {
1380 self.tx_relay_receiver.try_iter().take(limit).collect()
1381 }
1382
1383 pub fn shared_best_header(&self) -> HeaderIndexView {
1384 self.shared_best_header.read().to_owned()
1385 }
1386
1387 pub fn shared_best_header_ref(&self) -> RwLockReadGuard<HeaderIndexView> {
1388 self.shared_best_header.read()
1389 }
1390
1391 pub fn may_set_shared_best_header(&self, header: HeaderIndexView) {
1392 let mut shared_best_header = self.shared_best_header.write();
1393 if !header.is_better_than(shared_best_header.total_difficulty()) {
1394 return;
1395 }
1396
1397 if let Some(metrics) = ckb_metrics::handle() {
1398 metrics.ckb_shared_best_number.set(header.number() as i64);
1399 }
1400 *shared_best_header = header;
1401 }
1402
1403 pub(crate) fn suspend_sync(&self, peer_state: &mut PeerState) {
1404 if peer_state.sync_started() {
1405 assert_ne!(
1406 self.peers.n_sync_started.fetch_sub(1, Ordering::AcqRel),
1407 0,
1408 "n_sync_started overflow when suspend_sync"
1409 );
1410 }
1411 peer_state.suspend_sync(SUSPEND_SYNC_TIME);
1412 }
1413
1414 pub(crate) fn tip_synced(&self, peer_state: &mut PeerState) {
1415 if peer_state.sync_started() {
1416 assert_ne!(
1417 self.peers.n_sync_started.fetch_sub(1, Ordering::AcqRel),
1418 0,
1419 "n_sync_started overflow when tip_synced"
1420 );
1421 }
1422 peer_state.tip_synced();
1423 }
1424
1425 pub fn mark_as_known_tx(&self, hash: Byte32) {
1426 self.mark_as_known_txs(iter::once(hash));
1427 }
1428
1429 pub fn remove_from_known_txs(&self, hash: &Byte32) {
1430 self.tx_filter.lock().remove(hash);
1431 }
1432
1433 pub fn mark_as_known_txs(&self, hashes: impl Iterator<Item = Byte32> + std::clone::Clone) {
1437 let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1438 let mut tx_filter = self.tx_filter.lock();
1439
1440 for hash in hashes {
1441 unknown_tx_hashes.remove(&hash);
1442 tx_filter.insert(hash);
1443 }
1444 }
1445
1446 pub fn pop_ask_for_txs(&self) -> HashMap<PeerIndex, Vec<Byte32>> {
1447 let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1448 let mut result: HashMap<PeerIndex, Vec<Byte32>> = HashMap::new();
1449 let now = Instant::now();
1450
1451 if !unknown_tx_hashes
1452 .peek()
1453 .map(|(_tx_hash, priority)| priority.should_request(now))
1454 .unwrap_or_default()
1455 {
1456 return result;
1457 }
1458
1459 while let Some((tx_hash, mut priority)) = unknown_tx_hashes.pop() {
1460 if priority.should_request(now) {
1461 if let Some(peer_index) = priority.next_request_peer() {
1462 result
1463 .entry(peer_index)
1464 .and_modify(|hashes| hashes.push(tx_hash.clone()))
1465 .or_insert_with(|| vec![tx_hash.clone()]);
1466 unknown_tx_hashes.push(tx_hash, priority);
1467 }
1468 } else {
1469 unknown_tx_hashes.push(tx_hash, priority);
1470 break;
1471 }
1472 }
1473 result
1474 }
1475
1476 pub fn add_ask_for_txs(&self, peer_index: PeerIndex, tx_hashes: Vec<Byte32>) -> Status {
1477 let mut unknown_tx_hashes = self.unknown_tx_hashes.lock();
1478
1479 for tx_hash in tx_hashes
1480 .into_iter()
1481 .take(MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER)
1482 {
1483 match unknown_tx_hashes.entry(tx_hash) {
1484 keyed_priority_queue::Entry::Occupied(entry) => {
1485 let mut priority = entry.get_priority().clone();
1486 priority.push_peer(peer_index);
1487 entry.set_priority(priority);
1488 }
1489 keyed_priority_queue::Entry::Vacant(entry) => {
1490 entry.set_priority(UnknownTxHashPriority {
1491 request_time: Instant::now(),
1492 peers: vec![peer_index],
1493 requested: false,
1494 })
1495 }
1496 }
1497 }
1498
1499 if unknown_tx_hashes.len() >= MAX_UNKNOWN_TX_HASHES_SIZE
1501 || unknown_tx_hashes.len()
1502 >= self.peers.state.len() * MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER
1503 {
1504 warn!(
1505 "unknown_tx_hashes is too long, len: {}",
1506 unknown_tx_hashes.len()
1507 );
1508
1509 let mut peer_unknown_counter = 0;
1510 for (_hash, priority) in unknown_tx_hashes.iter() {
1511 for peer in priority.peers.iter() {
1512 if *peer == peer_index {
1513 peer_unknown_counter += 1;
1514 }
1515 }
1516 }
1517 if peer_unknown_counter >= MAX_UNKNOWN_TX_HASHES_SIZE_PER_PEER {
1518 return StatusCode::TooManyUnknownTransactions.into();
1519 }
1520
1521 return Status::ignored();
1522 }
1523
1524 Status::ok()
1525 }
1526
1527 pub fn already_known_tx(&self, hash: &Byte32) -> bool {
1528 self.tx_filter.lock().contains(hash)
1529 }
1530
1531 pub fn tx_filter(&self) -> MutexGuard<TtlFilter<Byte32>> {
1532 self.tx_filter.lock()
1533 }
1534
1535 pub fn unknown_tx_hashes(
1536 &self,
1537 ) -> MutexGuard<KeyedPriorityQueue<Byte32, UnknownTxHashPriority>> {
1538 self.unknown_tx_hashes.lock()
1539 }
1540
1541 pub fn insert_inflight_proposals(
1542 &self,
1543 ids: Vec<packed::ProposalShortId>,
1544 block_number: BlockNumber,
1545 ) -> Vec<bool> {
1546 ids.into_iter()
1547 .map(|id| match self.inflight_proposals.entry(id) {
1548 dashmap::mapref::entry::Entry::Occupied(mut occupied) => {
1549 if *occupied.get() < block_number {
1550 occupied.insert(block_number);
1551 true
1552 } else {
1553 false
1554 }
1555 }
1556 dashmap::mapref::entry::Entry::Vacant(vacant) => {
1557 vacant.insert(block_number);
1558 true
1559 }
1560 })
1561 .collect()
1562 }
1563
1564 pub fn remove_inflight_proposals(&self, ids: &[packed::ProposalShortId]) -> Vec<bool> {
1565 ids.iter()
1566 .map(|id| self.inflight_proposals.remove(id).is_some())
1567 .collect()
1568 }
1569
1570 pub fn clear_expired_inflight_proposals(&self, keep_min_block_number: BlockNumber) {
1571 self.inflight_proposals
1572 .retain(|_, block_number| *block_number >= keep_min_block_number);
1573 }
1574
1575 pub fn contains_inflight_proposal(&self, proposal_id: &packed::ProposalShortId) -> bool {
1576 self.inflight_proposals.contains_key(proposal_id)
1577 }
1578
1579 pub fn drain_get_block_proposals(
1580 &self,
1581 ) -> DashMap<packed::ProposalShortId, HashSet<PeerIndex>> {
1582 let ret = self.pending_get_block_proposals.clone();
1583 self.pending_get_block_proposals.clear();
1584 ret
1585 }
1586
1587 pub fn insert_get_block_proposals(&self, pi: PeerIndex, ids: Vec<packed::ProposalShortId>) {
1588 for id in ids.into_iter() {
1589 self.pending_get_block_proposals
1590 .entry(id)
1591 .or_default()
1592 .insert(pi);
1593 }
1594 }
1595
1596 pub fn disconnected(&self, pi: PeerIndex) {
1601 let removed_inflight_blocks_count = self.write_inflight_blocks().remove_by_peer(pi);
1602 if removed_inflight_blocks_count > 0 {
1603 debug!(
1604 "disconnected {}, remove {} inflight blocks",
1605 pi, removed_inflight_blocks_count
1606 )
1607 }
1608 self.peers().disconnected(pi);
1609 }
1610}
1611
1612#[derive(Clone)]
1614pub struct ActiveChain {
1615 sync_shared: SyncShared,
1616 snapshot: Arc<Snapshot>,
1617}
1618
1619#[doc(hidden)]
1620impl ActiveChain {
1621 pub(crate) fn sync_shared(&self) -> &SyncShared {
1622 &self.sync_shared
1623 }
1624
1625 pub fn shared(&self) -> &Shared {
1626 self.sync_shared.shared()
1627 }
1628
1629 fn store(&self) -> &ChainDB {
1630 self.sync_shared.store()
1631 }
1632
1633 pub fn state(&self) -> &SyncState {
1634 self.sync_shared.state()
1635 }
1636
1637 fn snapshot(&self) -> &Snapshot {
1638 &self.snapshot
1639 }
1640
1641 pub fn get_block_hash(&self, number: BlockNumber) -> Option<packed::Byte32> {
1642 self.snapshot().get_block_hash(number)
1643 }
1644
1645 pub fn get_block(&self, h: &packed::Byte32) -> Option<core::BlockView> {
1646 self.store().get_block(h)
1647 }
1648
1649 pub fn get_block_header(&self, h: &packed::Byte32) -> Option<core::HeaderView> {
1650 self.store().get_block_header(h)
1651 }
1652
1653 pub fn get_block_ext(&self, h: &packed::Byte32) -> Option<core::BlockExt> {
1654 self.snapshot().get_block_ext(h)
1655 }
1656
1657 pub fn get_block_filter(&self, hash: &packed::Byte32) -> Option<packed::Bytes> {
1658 self.store().get_block_filter(hash)
1659 }
1660
1661 pub fn get_block_filter_hash(&self, hash: &packed::Byte32) -> Option<packed::Byte32> {
1662 self.store().get_block_filter_hash(hash)
1663 }
1664
1665 pub fn get_latest_built_filter_block_number(&self) -> BlockNumber {
1666 self.snapshot
1667 .get_latest_built_filter_data_block_hash()
1668 .and_then(|hash| self.snapshot.get_block_number(&hash))
1669 .unwrap_or_default()
1670 }
1671
1672 pub fn total_difficulty(&self) -> &U256 {
1673 self.snapshot.total_difficulty()
1674 }
1675
1676 pub fn tip_header(&self) -> core::HeaderView {
1677 self.snapshot.tip_header().clone()
1678 }
1679
1680 pub fn tip_hash(&self) -> Byte32 {
1681 self.snapshot.tip_hash()
1682 }
1683
1684 pub fn tip_number(&self) -> BlockNumber {
1685 self.snapshot.tip_number()
1686 }
1687
1688 pub fn epoch_ext(&self) -> core::EpochExt {
1689 self.snapshot.epoch_ext().clone()
1690 }
1691
1692 pub fn is_main_chain(&self, hash: &packed::Byte32) -> bool {
1693 self.snapshot.is_main_chain(hash)
1694 }
1695 pub fn is_unverified_chain(&self, hash: &packed::Byte32) -> bool {
1696 self.store().get_block_epoch_index(hash).is_some()
1697 }
1698
1699 pub fn is_initial_block_download(&self) -> bool {
1700 self.shared().is_initial_block_download()
1701 }
1702 pub fn unverified_tip_header(&self) -> HeaderIndex {
1703 self.shared().get_unverified_tip()
1704 }
1705
1706 pub fn unverified_tip_hash(&self) -> Byte32 {
1707 self.unverified_tip_header().hash()
1708 }
1709
1710 pub fn unverified_tip_number(&self) -> BlockNumber {
1711 self.unverified_tip_header().number()
1712 }
1713
1714 pub fn get_ancestor(&self, base: &Byte32, number: BlockNumber) -> Option<HeaderIndexView> {
1715 self.get_ancestor_internal(base, number, false)
1716 }
1717
1718 pub fn get_ancestor_with_unverified(
1719 &self,
1720 base: &Byte32,
1721 number: BlockNumber,
1722 ) -> Option<HeaderIndexView> {
1723 self.get_ancestor_internal(base, number, true)
1724 }
1725
1726 fn get_ancestor_internal(
1727 &self,
1728 base: &Byte32,
1729 number: BlockNumber,
1730 with_unverified: bool,
1731 ) -> Option<HeaderIndexView> {
1732 let tip_number = {
1733 if with_unverified {
1734 self.unverified_tip_number()
1735 } else {
1736 self.tip_number()
1737 }
1738 };
1739
1740 let block_is_on_chain_fn = |hash: &Byte32| {
1741 if with_unverified {
1742 self.is_unverified_chain(hash)
1743 } else {
1744 self.is_main_chain(hash)
1745 }
1746 };
1747
1748 let get_header_view_fn = |hash: &Byte32, store_first: bool| {
1749 self.sync_shared.get_header_index_view(hash, store_first)
1750 };
1751
1752 let fast_scanner_fn = |number: BlockNumber, current: BlockNumberAndHash| {
1753 if current.number <= tip_number && block_is_on_chain_fn(¤t.hash) {
1755 self.get_block_hash(number)
1756 .and_then(|hash| self.sync_shared.get_header_index_view(&hash, true))
1757 } else {
1758 None
1759 }
1760 };
1761
1762 self.sync_shared
1763 .get_header_index_view(base, false)?
1764 .get_ancestor(tip_number, number, get_header_view_fn, fast_scanner_fn)
1765 }
1766
1767 pub fn get_locator(&self, start: BlockNumberAndHash) -> Vec<Byte32> {
1768 let mut step = 1;
1769 let mut locator = Vec::with_capacity(32);
1770 let mut index = start.number();
1771 let mut base = start.hash();
1772
1773 loop {
1774 let header_hash = self
1775 .get_ancestor(&base, index)
1776 .unwrap_or_else(|| {
1777 panic!(
1778 "index calculated in get_locator: \
1779 start: {:?}, base: {}, step: {}, locators({}): {:?}.",
1780 start,
1781 base,
1782 step,
1783 locator.len(),
1784 locator,
1785 )
1786 })
1787 .hash();
1788 locator.push(header_hash.clone());
1789
1790 if locator.len() >= 10 {
1791 step <<= 1;
1792 }
1793
1794 if index < step * 2 {
1795 if locator.len() < 52 && index > ONE_DAY_BLOCK_NUMBER {
1804 index >>= 1;
1805 base = header_hash;
1806 continue;
1807 }
1808 if index != 0 {
1810 locator.push(self.sync_shared.consensus().genesis_hash());
1811 }
1812 break;
1813 }
1814 index -= step;
1815 base = header_hash;
1816 }
1817 locator
1818 }
1819
1820 pub fn last_common_ancestor(
1821 &self,
1822 pa: &BlockNumberAndHash,
1823 pb: &BlockNumberAndHash,
1824 ) -> Option<BlockNumberAndHash> {
1825 let (mut m_left, mut m_right) = if pa.number() > pb.number() {
1826 (pb.clone(), pa.clone())
1827 } else {
1828 (pa.clone(), pb.clone())
1829 };
1830
1831 m_right = self
1832 .get_ancestor(&m_right.hash(), m_left.number())?
1833 .number_and_hash();
1834 if m_left == m_right {
1835 return Some(m_left);
1836 }
1837 debug_assert!(m_left.number() == m_right.number());
1838
1839 while m_left != m_right {
1840 m_left = self
1841 .get_ancestor(&m_left.hash(), m_left.number() - 1)?
1842 .number_and_hash();
1843 m_right = self
1844 .get_ancestor(&m_right.hash(), m_right.number() - 1)?
1845 .number_and_hash();
1846 }
1847 Some(m_left)
1848 }
1849
1850 pub fn locate_latest_common_block(
1851 &self,
1852 _hash_stop: &Byte32,
1853 locator: &[Byte32],
1854 ) -> Option<BlockNumber> {
1855 if locator.is_empty() {
1856 return None;
1857 }
1858
1859 let locator_hash = locator.last().expect("empty checked");
1860 if locator_hash != &self.sync_shared.consensus().genesis_hash() {
1861 return None;
1862 }
1863
1864 let (index, latest_common) = locator
1866 .iter()
1867 .enumerate()
1868 .map(|(index, hash)| (index, self.snapshot.get_block_number(hash)))
1869 .find(|(_index, number)| number.is_some())
1870 .expect("locator last checked");
1871
1872 if index == 0 || latest_common == Some(0) {
1873 return latest_common;
1874 }
1875
1876 if let Some(header) = locator
1877 .get(index - 1)
1878 .and_then(|hash| self.sync_shared.store().get_block_header(hash))
1879 {
1880 let mut block_hash = header.data().raw().parent_hash();
1881 loop {
1882 let block_header = match self.sync_shared.store().get_block_header(&block_hash) {
1883 None => break latest_common,
1884 Some(block_header) => block_header,
1885 };
1886
1887 if let Some(block_number) = self.snapshot.get_block_number(&block_hash) {
1888 return Some(block_number);
1889 }
1890
1891 block_hash = block_header.data().raw().parent_hash();
1892 }
1893 } else {
1894 latest_common
1895 }
1896 }
1897
1898 pub fn get_locator_response(
1899 &self,
1900 block_number: BlockNumber,
1901 hash_stop: &Byte32,
1902 ) -> Vec<core::HeaderView> {
1903 let tip_number = self.tip_header().number();
1904 let max_height = cmp::min(
1905 block_number + 1 + MAX_HEADERS_LEN as BlockNumber,
1906 tip_number + 1,
1907 );
1908 (block_number + 1..max_height)
1909 .filter_map(|block_number| self.snapshot.get_block_hash(block_number))
1910 .take_while(|block_hash| block_hash != hash_stop)
1911 .filter_map(|block_hash| self.sync_shared.store().get_block_header(&block_hash))
1912 .collect()
1913 }
1914
1915 pub fn send_getheaders_to_peer(
1916 &self,
1917 nc: &Arc<dyn CKBProtocolContext + Sync>,
1918 peer: PeerIndex,
1919 block_number_and_hash: BlockNumberAndHash,
1920 ) {
1921 if let Some(last_time) = self
1922 .state()
1923 .pending_get_headers
1924 .write()
1925 .get(&(peer, block_number_and_hash.hash()))
1926 {
1927 if Instant::now() < *last_time + GET_HEADERS_TIMEOUT {
1928 debug!(
1929 "Last get_headers request to peer {} is less than {:?}; Ignore it.",
1930 peer, GET_HEADERS_TIMEOUT,
1931 );
1932 return;
1933 } else {
1934 debug!(
1935 "Can not get headers from {} in {:?}, retry",
1936 peer, GET_HEADERS_TIMEOUT,
1937 );
1938 }
1939 }
1940 self.state()
1941 .pending_get_headers
1942 .write()
1943 .put((peer, block_number_and_hash.hash()), Instant::now());
1944
1945 debug!(
1946 "send_getheaders_to_peer peer={}, hash={}",
1947 peer,
1948 block_number_and_hash.hash()
1949 );
1950 let locator_hash = self.get_locator(block_number_and_hash);
1951 let content = packed::GetHeaders::new_builder()
1952 .block_locator_hashes(locator_hash)
1953 .hash_stop(packed::Byte32::zero())
1954 .build();
1955 let message = packed::SyncMessage::new_builder().set(content).build();
1956 let nc = Arc::clone(nc);
1957 self.shared().async_handle().spawn(async move {
1958 async_send_message(SupportProtocols::Sync.protocol_id(), &nc, peer, &message).await
1959 });
1960 }
1961
1962 pub fn get_block_status(&self, block_hash: &Byte32) -> BlockStatus {
1963 self.shared().get_block_status(block_hash)
1964 }
1965
1966 pub fn contains_block_status(&self, block_hash: &Byte32, status: BlockStatus) -> bool {
1967 self.get_block_status(block_hash).contains(status)
1968 }
1969}
1970
1971#[derive(Clone, Copy, Debug)]
1974pub enum IBDState {
1975 In,
1976 Out,
1977}
1978
1979impl From<bool> for IBDState {
1980 fn from(src: bool) -> Self {
1981 if src { IBDState::In } else { IBDState::Out }
1982 }
1983}
1984
1985impl From<IBDState> for bool {
1986 fn from(s: IBDState) -> bool {
1987 match s {
1988 IBDState::In => true,
1989 IBDState::Out => false,
1990 }
1991 }
1992}
1993
1994pub(crate) fn post_sync_process(
1995 nc: &dyn CKBProtocolContext,
1996 peer: PeerIndex,
1997 item_name: &str,
1998 status: Status,
1999) {
2000 if let Some(ban_time) = status.should_ban() {
2001 error!(
2002 "Receive {} from {}. Ban {:?} for {}",
2003 item_name, peer, ban_time, status
2004 );
2005 nc.ban_peer(peer, ban_time, status.to_string());
2006 } else if status.should_warn() {
2007 warn!("Receive {} from {}, {}", item_name, peer, status);
2008 } else if !status.is_ok() {
2009 debug!("Receive {} from {}, {}", item_name, peer, status);
2010 }
2011}