ckb_sync/synchronizer/
mod.rs

1//! CKB node has initial block download phase (IBD mode) like Bitcoin:
2//! <https://btcinformation.org/en/glossary/initial-block-download>
3//!
4//! When CKB node is in IBD mode, it will respond `packed::InIBD` to `GetHeaders` and `GetBlocks` requests
5//!
6//! And CKB has a headers-first synchronization style like Bitcoin:
7//! <https://btcinformation.org/en/glossary/headers-first-sync>
8//!
9mod block_fetcher;
10mod block_process;
11mod get_blocks_process;
12mod get_headers_process;
13mod headers_process;
14mod in_ibd_process;
15
16pub(crate) use self::block_fetcher::BlockFetcher;
17pub(crate) use self::block_process::BlockProcess;
18pub(crate) use self::get_blocks_process::GetBlocksProcess;
19pub(crate) use self::get_headers_process::GetHeadersProcess;
20pub(crate) use self::headers_process::HeadersProcess;
21pub(crate) use self::in_ibd_process::InIBDProcess;
22
23use crate::types::{HeadersSyncController, IBDState, Peers, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, metric_ckb_message_bytes, send_message_to};
25use crate::{Status, StatusCode};
26use ckb_shared::block_status::BlockStatus;
27
28use ckb_chain::{ChainController, RemoteBlock};
29use ckb_channel as channel;
30use ckb_channel::{Receiver, select};
31use ckb_constant::sync::{
32    BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME,
33    INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE,
34};
35use ckb_logger::{debug, error, info, trace, warn};
36use ckb_metrics::HistogramTimer;
37use ckb_network::{
38    CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceControl, SupportProtocols,
39    async_trait, bytes::Bytes, tokio,
40};
41use ckb_shared::types::HeaderIndexView;
42use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
43use ckb_systemtime::unix_time_as_millis;
44
45#[cfg(test)]
46use ckb_types::core;
47use ckb_types::{
48    core::BlockNumber,
49    packed::{self, Byte32},
50    prelude::*,
51};
52use std::{
53    collections::HashSet,
54    sync::{Arc, atomic::Ordering},
55    time::{Duration, Instant},
56};
57
58pub const SEND_GET_HEADERS_TOKEN: u64 = 0;
59pub const IBD_BLOCK_FETCH_TOKEN: u64 = 1;
60pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
61pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
62pub const NO_PEER_CHECK_TOKEN: u64 = 255;
63
64const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1);
65const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
66const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);
67
68#[derive(Copy, Clone)]
69enum CanStart {
70    FetchToTarget(BlockNumber),
71    Ready,
72    MinWorkNotReach,
73    AssumeValidNotFound,
74}
75
76struct FetchCMD {
77    peers: Vec<PeerIndex>,
78    ibd_state: IBDState,
79}
80
81struct BlockFetchCMD {
82    sync_shared: Arc<SyncShared>,
83    p2p_control: ServiceControl,
84    recv: channel::Receiver<FetchCMD>,
85    can_start: CanStart,
86    number: BlockNumber,
87    start_timestamp: u64,
88}
89
90impl BlockFetchCMD {
91    fn process_fetch_cmd(&mut self, cmd: FetchCMD) {
92        let FetchCMD { peers, ibd_state }: FetchCMD = cmd;
93
94        let fetch_blocks_fn = |cmd: &mut BlockFetchCMD, assume_target: BlockNumber| {
95            for peer in peers {
96                if ckb_stop_handler::has_received_stop_signal() {
97                    return;
98                }
99
100                let mut fetch_end: BlockNumber = u64::MAX;
101                if assume_target != 0 {
102                    fetch_end = assume_target
103                }
104
105                if let Some(fetch) =
106                    BlockFetcher::new(Arc::clone(&cmd.sync_shared), peer, ibd_state)
107                        .fetch(fetch_end)
108                {
109                    for item in fetch {
110                        if ckb_stop_handler::has_received_stop_signal() {
111                            return;
112                        }
113                        BlockFetchCMD::send_getblocks(item, &cmd.p2p_control, peer);
114                    }
115                }
116            }
117        };
118
119        match self.can_start() {
120            CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target),
121            CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX),
122            CanStart::MinWorkNotReach => {
123                let best_known = self.sync_shared.state().shared_best_header_ref();
124                let number = best_known.number();
125                if number != self.number && (number - self.number) % 10000 == 0 {
126                    self.number = number;
127                    info!(
128                        "The current best known header number: {}, total difficulty: {:#x}. \
129                                 Block download minimum requirements: header number: 500_000, total difficulty: {:#x}.",
130                        number,
131                        best_known.total_difficulty(),
132                        self.sync_shared.state().min_chain_work()
133                    );
134                }
135            }
136            CanStart::AssumeValidNotFound => {
137                let state = self.sync_shared.state();
138                let shared = self.sync_shared.shared();
139                let best_known = state.shared_best_header_ref();
140                let number = best_known.number();
141                let assume_valid_target: Byte32 = shared
142                    .assume_valid_targets()
143                    .as_ref()
144                    .and_then(|targets| targets.first())
145                    .map(Pack::pack)
146                    .expect("assume valid target must exist");
147
148                if number != self.number && (number - self.number) % 10000 == 0 {
149                    self.number = number;
150                    let remaining_headers_sync_log = self.reaming_headers_sync_log();
151
152                    info!(
153                        "best known header {}-{}, \
154                                 CKB is syncing to latest Header to find the assume valid target: {}. \
155                                 Please wait. {}",
156                        number,
157                        best_known.hash(),
158                        assume_valid_target,
159                        remaining_headers_sync_log
160                    );
161                }
162            }
163        }
164    }
165
166    fn reaming_headers_sync_log(&self) -> String {
167        if let Some(remaining_headers_needed) = self.calc_time_need_to_reach_latest_tip_header() {
168            format!(
169                "Need {} minutes to sync to the latest Header.",
170                remaining_headers_needed.as_secs() / 60
171            )
172        } else {
173            "".to_string()
174        }
175    }
176
177    // Timeline:
178    //
179    // |-------------------|--------------------------------|------------|---->
180    // Genesis  (shared best timestamp)                     |           now
181    // |                   |                                |            |
182    // |             (Sync point)                  (CKB process start)   |
183    // |                   |                                             |
184    // |--Synced Part------|------------ Remain to Sync -----------------|
185    // |                                                                 |
186    // |------------------- CKB Chain Age -------------------------------|
187    //
188    fn calc_time_need_to_reach_latest_tip_header(&self) -> Option<Duration> {
189        let genesis_timestamp = self
190            .sync_shared
191            .consensus()
192            .genesis_block()
193            .header()
194            .timestamp();
195        let shared_best_timestamp = self.sync_shared.state().shared_best_header().timestamp();
196
197        let ckb_process_start_timestamp = self.start_timestamp;
198
199        let now_timestamp = unix_time_as_millis();
200
201        let ckb_chain_age = now_timestamp.checked_sub(genesis_timestamp)?;
202
203        let ckb_process_age = now_timestamp.checked_sub(ckb_process_start_timestamp)?;
204
205        let has_synced_headers_age = shared_best_timestamp.checked_sub(genesis_timestamp)?;
206
207        let ckb_sync_header_speed = has_synced_headers_age.checked_div(ckb_process_age)?;
208
209        let sync_all_headers_timecost = ckb_chain_age.checked_div(ckb_sync_header_speed)?;
210
211        let sync_remaining_headers_needed =
212            sync_all_headers_timecost.checked_sub(ckb_process_age)?;
213
214        Some(Duration::from_millis(sync_remaining_headers_needed))
215    }
216
217    fn run(&mut self, stop_signal: Receiver<()>) {
218        loop {
219            select! {
220                recv(self.recv) -> msg => {
221                    if let Ok(cmd) = msg {
222                        self.process_fetch_cmd(cmd)
223                    }
224                }
225                recv(stop_signal) -> _ => {
226                    info!("BlockDownload received exit signal, exit now");
227                    return;
228                }
229            }
230        }
231    }
232
233    fn can_start(&mut self) -> CanStart {
234        if let CanStart::Ready = self.can_start {
235            return self.can_start;
236        }
237
238        let shared = self.sync_shared.shared();
239        let state = self.sync_shared.state();
240
241        let min_work_reach = |flag: &mut CanStart| {
242            if state.min_chain_work_ready() {
243                *flag = CanStart::AssumeValidNotFound;
244            }
245        };
246
247        let assume_valid_target_find = |flag: &mut CanStart| {
248            let mut assume_valid_targets = shared.assume_valid_targets();
249            if let Some(ref targets) = *assume_valid_targets {
250                if targets.is_empty() {
251                    assume_valid_targets.take();
252                    *flag = CanStart::Ready;
253                    return;
254                }
255                let first_target = targets
256                    .first()
257                    .expect("has checked targets is not empty, assume valid target must exist");
258                match shared.header_map().get(&first_target.into()) {
259                    Some(header) => {
260                        if matches!(*flag, CanStart::FetchToTarget(fetch_target) if fetch_target == header.number())
261                        {
262                            // BlockFetchCMD has set the fetch target, no need to set it again
263                        } else {
264                            *flag = CanStart::FetchToTarget(header.number());
265                            info!(
266                                "assume valid target found in header_map; CKB will start fetch blocks to {:?} now",
267                                header.number_and_hash()
268                            );
269                        }
270                        // Blocks that are no longer in the scope of ibd must be forced to verify
271                        if unix_time_as_millis().saturating_sub(header.timestamp()) < MAX_TIP_AGE {
272                            assume_valid_targets.take();
273                            warn!(
274                                "the duration gap between 'assume valid target' and 'now' is less than 24h; CKB will ignore the specified assume valid target and do full verification from now on"
275                            );
276                        }
277                    }
278                    None => {
279                        // Best known already not in the scope of ibd, it means target is invalid
280                        if unix_time_as_millis()
281                            .saturating_sub(state.shared_best_header_ref().timestamp())
282                            < MAX_TIP_AGE
283                        {
284                            warn!(
285                                "the duration gap between 'shared_best_header' and 'now' is less than 24h, but CKB haven't found the assume valid target in header_map; CKB will ignore the specified assume valid target and do full verification from now on"
286                            );
287                            *flag = CanStart::Ready;
288                            assume_valid_targets.take();
289                        }
290                    }
291                }
292            } else {
293                *flag = CanStart::Ready;
294            }
295        };
296
297        match self.can_start {
298            CanStart::FetchToTarget(_) => {
299                assume_valid_target_find(&mut self.can_start);
300                self.can_start
301            }
302            CanStart::Ready => self.can_start,
303            CanStart::MinWorkNotReach => {
304                min_work_reach(&mut self.can_start);
305                if let CanStart::AssumeValidNotFound = self.can_start {
306                    assume_valid_target_find(&mut self.can_start);
307                }
308                self.can_start
309            }
310            CanStart::AssumeValidNotFound => {
311                assume_valid_target_find(&mut self.can_start);
312                self.can_start
313            }
314        }
315    }
316
317    fn send_getblocks(v_fetch: Vec<packed::Byte32>, nc: &ServiceControl, peer: PeerIndex) {
318        let content = packed::GetBlocks::new_builder()
319            .block_hashes(v_fetch.clone())
320            .build();
321        let message = packed::SyncMessage::new_builder().set(content).build();
322
323        debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
324        if let Err(err) = nc.send_message_to(
325            peer,
326            SupportProtocols::Sync.protocol_id(),
327            message.as_bytes(),
328        ) {
329            debug!("synchronizer sending GetBlocks error: {:?}", err);
330        }
331    }
332}
333
334/// Sync protocol handle
335pub struct Synchronizer {
336    pub(crate) chain: ChainController,
337    /// Sync shared state
338    pub shared: Arc<SyncShared>,
339    fetch_channel: Option<channel::Sender<FetchCMD>>,
340}
341
342impl Synchronizer {
343    /// Init sync protocol handle
344    ///
345    /// This is a runtime sync protocol shared state, and any Sync protocol messages will be processed and forwarded by it
346    pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
347        Synchronizer {
348            chain,
349            shared,
350            fetch_channel: None,
351        }
352    }
353
354    /// Get shared state
355    pub fn shared(&self) -> &Arc<SyncShared> {
356        &self.shared
357    }
358
359    fn try_process(
360        &self,
361        nc: Arc<dyn CKBProtocolContext + Sync>,
362        peer: PeerIndex,
363        message: packed::SyncMessageUnionReader<'_>,
364    ) -> Status {
365        let _trace_timecost: Option<HistogramTimer> = {
366            ckb_metrics::handle().map(|handle| {
367                handle
368                    .ckb_sync_msg_process_duration
369                    .with_label_values(&[message.item_name()])
370                    .start_timer()
371            })
372        };
373
374        match message {
375            packed::SyncMessageUnionReader::GetHeaders(reader) => {
376                GetHeadersProcess::new(reader, self, peer, nc.as_ref()).execute()
377            }
378            packed::SyncMessageUnionReader::SendHeaders(reader) => {
379                HeadersProcess::new(reader, self, peer, nc.as_ref()).execute()
380            }
381            packed::SyncMessageUnionReader::GetBlocks(reader) => {
382                GetBlocksProcess::new(reader, self, peer, nc.as_ref()).execute()
383            }
384            packed::SyncMessageUnionReader::SendBlock(reader) => {
385                if reader.check_data() {
386                    BlockProcess::new(reader, self, peer, nc).execute()
387                } else {
388                    StatusCode::ProtocolMessageIsMalformed.with_context("SendBlock is invalid")
389                }
390            }
391            packed::SyncMessageUnionReader::InIBD(_) => {
392                InIBDProcess::new(self, peer, nc.as_ref()).execute()
393            }
394        }
395    }
396
397    fn process(
398        &self,
399        nc: Arc<dyn CKBProtocolContext + Sync>,
400        peer: PeerIndex,
401        message: packed::SyncMessageUnionReader<'_>,
402    ) {
403        let item_name = message.item_name();
404        let item_bytes = message.as_slice().len() as u64;
405        let status = self.try_process(Arc::clone(&nc), peer, message);
406
407        metric_ckb_message_bytes(
408            MetricDirection::In,
409            &SupportProtocols::Sync.name(),
410            item_name,
411            Some(status.code()),
412            item_bytes,
413        );
414
415        post_sync_process(nc.as_ref(), peer, item_name, status);
416    }
417
418    /// Get peers info
419    pub fn peers(&self) -> &Peers {
420        self.shared().state().peers()
421    }
422
423    fn better_tip_header(&self) -> HeaderIndexView {
424        let (header, total_difficulty) = {
425            let active_chain = self.shared.active_chain();
426            (
427                active_chain.tip_header(),
428                active_chain.total_difficulty().to_owned(),
429            )
430        };
431        let best_known = self.shared.state().shared_best_header();
432        // is_better_chain
433        if total_difficulty > *best_known.total_difficulty() {
434            (header, total_difficulty).into()
435        } else {
436            best_known
437        }
438    }
439
440    /// Process a new block sync from other peer
441    //TODO: process block which we don't request
442    pub fn asynchronous_process_remote_block(&self, remote_block: RemoteBlock) {
443        let block_hash = remote_block.block.hash();
444        let status = self.shared.active_chain().get_block_status(&block_hash);
445        // NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
446        // stopping synchronization even when orphan_pool maintains dirty items by bugs.
447        if status.contains(BlockStatus::BLOCK_STORED) {
448            error!("Block {} already stored", block_hash);
449        } else if status.contains(BlockStatus::HEADER_VALID) {
450            self.shared.accept_remote_block(&self.chain, remote_block);
451        } else {
452            debug!(
453                "Synchronizer process_new_block unexpected status {:?} {}",
454                status, block_hash,
455            );
456            // TODO which error should we return?
457        }
458    }
459
460    /// Process new block in blocking way
461    #[cfg(test)]
462    pub fn blocking_process_new_block(
463        &self,
464        block: core::BlockView,
465        _peer_id: PeerIndex,
466    ) -> Result<bool, ckb_error::Error> {
467        let block_hash = block.hash();
468        let status = self.shared.active_chain().get_block_status(&block_hash);
469        // NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
470        // stopping synchronization even when orphan_pool maintains dirty items by bugs.
471        if status.contains(BlockStatus::BLOCK_STORED) {
472            error!("block {} already stored", block_hash);
473            Ok(false)
474        } else if status.contains(BlockStatus::HEADER_VALID) {
475            self.chain.blocking_process_block(Arc::new(block))
476        } else {
477            debug!(
478                "Synchronizer process_new_block unexpected status {:?} {}",
479                status, block_hash,
480            );
481            // TODO while error should we return?
482            Ok(false)
483        }
484    }
485
486    /// Get blocks to fetch
487    pub fn get_blocks_to_fetch(
488        &self,
489        peer: PeerIndex,
490        ibd: IBDState,
491    ) -> Option<Vec<Vec<packed::Byte32>>> {
492        BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch(BlockNumber::MAX)
493    }
494
495    pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
496        let pid = SupportProtocols::Sync.protocol_id();
497        let (is_outbound, is_whitelist, is_2023edition) = nc
498            .get_peer(peer)
499            .map(|peer| {
500                (
501                    peer.is_outbound(),
502                    peer.is_whitelist,
503                    peer.protocols.get(&pid).map(|v| v == "3").unwrap_or(false),
504                )
505            })
506            .unwrap_or((false, false, false));
507
508        self.peers()
509            .sync_connected(peer, is_outbound, is_whitelist, is_2023edition);
510    }
511
512    /// Regularly check and eject some nodes that do not respond in time
513    //   - If at timeout their best known block now has more work than our tip
514    //     when the timeout was set, then either reset the timeout or clear it
515    //     (after comparing against our current tip's work)
516    //   - If at timeout their best known block still has less work than our
517    //     tip did when the timeout was set, then send a getheaders message,
518    //     and set a shorter timeout, HEADERS_RESPONSE_TIME seconds in future.
519    //     If their best known block is still behind when that new timeout is
520    //     reached, disconnect.
521    pub fn eviction(&self, nc: &dyn CKBProtocolContext) {
522        let active_chain = self.shared.active_chain();
523        let mut eviction = Vec::new();
524        let better_tip_header = self.better_tip_header();
525        for mut kv_pair in self.peers().state.iter_mut() {
526            let (peer, state) = kv_pair.pair_mut();
527            let now = unix_time_as_millis();
528
529            if let Some(ref mut controller) = state.headers_sync_controller {
530                let better_tip_ts = better_tip_header.timestamp();
531                if let Some(is_timeout) = controller.is_timeout(better_tip_ts, now) {
532                    if is_timeout {
533                        eviction.push(*peer);
534                        continue;
535                    }
536                } else {
537                    active_chain.send_getheaders_to_peer(
538                        nc,
539                        *peer,
540                        better_tip_header.number_and_hash(),
541                    );
542                }
543            }
544
545            // On ibd, node should only have one peer to sync headers, and it's state can control by
546            // headers_sync_controller.
547            //
548            // The header sync of other nodes does not matter in the ibd phase, and parallel synchronization
549            // can be enabled by unknown list, so there is no need to repeatedly download headers with
550            // multiple nodes at the same time.
551            if active_chain.is_initial_block_download() {
552                continue;
553            }
554            if state.peer_flags.is_outbound {
555                let best_known_header = state.best_known_header.as_ref();
556                let (tip_header, local_total_difficulty) = {
557                    (
558                        active_chain.tip_header().to_owned(),
559                        active_chain.total_difficulty().to_owned(),
560                    )
561                };
562                if best_known_header
563                    .map(|header_index| header_index.total_difficulty().clone())
564                    .unwrap_or_default()
565                    >= local_total_difficulty
566                {
567                    if state.chain_sync.timeout != 0 {
568                        state.chain_sync.timeout = 0;
569                        state.chain_sync.work_header = None;
570                        state.chain_sync.total_difficulty = None;
571                        state.chain_sync.sent_getheaders = false;
572                    }
573                } else if state.chain_sync.timeout == 0
574                    || (best_known_header.is_some()
575                        && best_known_header
576                            .map(|header_index| header_index.total_difficulty().clone())
577                            >= state.chain_sync.total_difficulty)
578                {
579                    // Our best block known by this peer is behind our tip, and we're either noticing
580                    // that for the first time, OR this peer was able to catch up to some earlier point
581                    // where we checked against our tip.
582                    // Either way, set a new timeout based on current tip.
583                    state.chain_sync.timeout = now + CHAIN_SYNC_TIMEOUT;
584                    state.chain_sync.work_header = Some(tip_header);
585                    state.chain_sync.total_difficulty = Some(local_total_difficulty);
586                    state.chain_sync.sent_getheaders = false;
587                } else if state.chain_sync.timeout > 0 && now > state.chain_sync.timeout {
588                    // No evidence yet that our peer has synced to a chain with work equal to that
589                    // of our tip, when we first detected it was behind. Send a single getheaders
590                    // message to give the peer a chance to update us.
591                    if state.chain_sync.sent_getheaders {
592                        if state.peer_flags.is_protect || state.peer_flags.is_whitelist {
593                            if state.sync_started() {
594                                self.shared().state().suspend_sync(state);
595                            }
596                        } else {
597                            eviction.push(*peer);
598                        }
599                    } else {
600                        state.chain_sync.sent_getheaders = true;
601                        state.chain_sync.timeout = now + EVICTION_HEADERS_RESPONSE_TIME;
602                        active_chain.send_getheaders_to_peer(
603                            nc,
604                            *peer,
605                            state
606                                .chain_sync
607                                .work_header
608                                .as_ref()
609                                .expect("work_header be assigned")
610                                .into(),
611                        );
612                    }
613                }
614            }
615        }
616        for peer in eviction {
617            info!("Timeout eviction peer={}", peer);
618            if let Err(err) = nc.disconnect(peer, "sync timeout eviction") {
619                debug!("synchronizer disconnect error: {:?}", err);
620            }
621        }
622    }
623
624    fn start_sync_headers(&self, nc: &dyn CKBProtocolContext) {
625        let now = unix_time_as_millis();
626        let active_chain = self.shared.active_chain();
627        let ibd = active_chain.is_initial_block_download();
628        let peers: Vec<PeerIndex> = self
629            .peers()
630            .state
631            .iter()
632            .filter(|kv_pair| kv_pair.value().can_start_sync(now, ibd))
633            .map(|kv_pair| *kv_pair.key())
634            .collect();
635
636        if peers.is_empty() {
637            return;
638        }
639
640        let tip = self.better_tip_header();
641
642        for peer in peers {
643            // Only sync with 1 peer if we're in IBD
644            if self
645                .shared()
646                .state()
647                .n_sync_started()
648                .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
649                    if ibd && x != 0 { None } else { Some(x + 1) }
650                })
651                .is_err()
652            {
653                break;
654            }
655            {
656                if let Some(mut peer_state) = self.peers().state.get_mut(&peer) {
657                    peer_state.start_sync(HeadersSyncController::from_header(&tip));
658                }
659            }
660
661            debug!("Start sync peer={}", peer);
662            active_chain.send_getheaders_to_peer(nc, peer, tip.number_and_hash());
663        }
664    }
665
666    fn get_peers_to_fetch(
667        &self,
668        ibd: IBDState,
669        disconnect_list: &HashSet<PeerIndex>,
670    ) -> Vec<PeerIndex> {
671        trace!("Poll find_blocks_to_fetch selecting peers");
672        let state = &self
673            .shared
674            .state()
675            .read_inflight_blocks()
676            .download_schedulers;
677        let mut peers: Vec<PeerIndex> = self
678            .peers()
679            .state
680            .iter()
681            .filter(|kv_pair| {
682                let (id, state) = kv_pair.pair();
683                if disconnect_list.contains(id) {
684                    return false;
685                };
686                match ibd {
687                    IBDState::In => {
688                        state.peer_flags.is_outbound
689                            || state.peer_flags.is_whitelist
690                            || state.peer_flags.is_protect
691                    }
692                    IBDState::Out => state.started_or_tip_synced(),
693                }
694            })
695            .map(|kv_pair| *kv_pair.key())
696            .collect();
697        peers.sort_by_key(|id| {
698            ::std::cmp::Reverse(
699                state
700                    .get(id)
701                    .map_or(INIT_BLOCKS_IN_TRANSIT_PER_PEER, |d| d.task_count()),
702            )
703        });
704        peers
705    }
706
707    fn find_blocks_to_fetch(&mut self, nc: &dyn CKBProtocolContext, ibd: IBDState) {
708        if self.chain.is_verifying_unverified_blocks_on_startup() {
709            trace!(
710                "skip find_blocks_to_fetch, ckb_chain is verifying unverified blocks on startup"
711            );
712            return;
713        }
714
715        if ckb_stop_handler::has_received_stop_signal() {
716            info!("received stop signal, stop find_blocks_to_fetch");
717            return;
718        }
719
720        let unverified_tip = self.shared.active_chain().unverified_tip_number();
721
722        let disconnect_list = {
723            let mut list = self
724                .shared()
725                .state()
726                .write_inflight_blocks()
727                .prune(unverified_tip);
728            if let IBDState::In = ibd {
729                // best known < tip and in IBD state, and unknown list is empty,
730                // these node can be disconnect
731                list.extend(
732                    self.shared
733                        .state()
734                        .peers()
735                        .get_best_known_less_than_tip_and_unknown_empty(unverified_tip),
736                )
737            };
738            list
739        };
740
741        for peer in disconnect_list.iter() {
742            // It is not forbidden to evict protected nodes:
743            // - First of all, this node is not designated by the user for protection,
744            //   but is connected randomly. It does not represent the will of the user
745            // - Secondly, in the synchronization phase, the nodes with zero download tasks are
746            //   retained, apart from reducing the download efficiency, there is no benefit.
747            if self
748                .peers()
749                .get_flag(*peer)
750                .map(|flag| flag.is_whitelist)
751                .unwrap_or(false)
752            {
753                continue;
754            }
755            if let Err(err) = nc.disconnect(*peer, "sync disconnect") {
756                debug!("synchronizer disconnect error: {:?}", err);
757            }
758        }
759
760        // fetch use a lot of cpu time, especially in ibd state
761        // so, the fetch function use another thread
762        match nc.p2p_control() {
763            Some(raw) => match self.fetch_channel {
764                Some(ref sender) => {
765                    if !sender.is_full() {
766                        let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
767                        let _ignore = sender.try_send(FetchCMD {
768                            peers,
769                            ibd_state: ibd,
770                        });
771                    }
772                }
773                None => {
774                    let p2p_control = raw.clone();
775                    let (sender, recv) = channel::bounded(2);
776                    let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
777                    sender
778                        .send(FetchCMD {
779                            peers,
780                            ibd_state: ibd,
781                        })
782                        .unwrap();
783                    self.fetch_channel = Some(sender);
784                    let thread = ::std::thread::Builder::new();
785                    let number = self.shared.state().shared_best_header_ref().number();
786                    const THREAD_NAME: &str = "BlockDownload";
787                    let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
788                    let blockdownload_jh = thread
789                        .name(THREAD_NAME.into())
790                        .spawn(move || {
791                            let stop_signal = new_crossbeam_exit_rx();
792                            BlockFetchCMD {
793                                sync_shared,
794                                p2p_control,
795                                recv,
796                                number,
797                                can_start: CanStart::MinWorkNotReach,
798                                start_timestamp: unix_time_as_millis(),
799                            }
800                            .run(stop_signal);
801                        })
802                        .expect("download thread can't start");
803                    register_thread(THREAD_NAME, blockdownload_jh);
804                }
805            },
806            None => {
807                for peer in self.get_peers_to_fetch(ibd, &disconnect_list) {
808                    if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) {
809                        for item in fetch {
810                            self.send_getblocks(item, nc, peer);
811                        }
812                    }
813                }
814            }
815        }
816    }
817
818    fn send_getblocks(
819        &self,
820        v_fetch: Vec<packed::Byte32>,
821        nc: &dyn CKBProtocolContext,
822        peer: PeerIndex,
823    ) {
824        let content = packed::GetBlocks::new_builder()
825            .block_hashes(v_fetch.clone())
826            .build();
827        let message = packed::SyncMessage::new_builder().set(content).build();
828
829        debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
830        let _status = send_message_to(nc, peer, &message);
831    }
832}
833
834#[async_trait]
835impl CKBProtocolHandler for Synchronizer {
836    async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
837        // NOTE: 100ms is what bitcoin use.
838        nc.set_notify(SYNC_NOTIFY_INTERVAL, SEND_GET_HEADERS_TOKEN)
839            .await
840            .expect("set_notify at init is ok");
841        nc.set_notify(SYNC_NOTIFY_INTERVAL, TIMEOUT_EVICTION_TOKEN)
842            .await
843            .expect("set_notify at init is ok");
844        nc.set_notify(IBD_BLOCK_FETCH_INTERVAL, IBD_BLOCK_FETCH_TOKEN)
845            .await
846            .expect("set_notify at init is ok");
847        nc.set_notify(NOT_IBD_BLOCK_FETCH_INTERVAL, NOT_IBD_BLOCK_FETCH_TOKEN)
848            .await
849            .expect("set_notify at init is ok");
850        nc.set_notify(Duration::from_secs(2), NO_PEER_CHECK_TOKEN)
851            .await
852            .expect("set_notify at init is ok");
853    }
854
855    async fn received(
856        &mut self,
857        nc: Arc<dyn CKBProtocolContext + Sync>,
858        peer_index: PeerIndex,
859        data: Bytes,
860    ) {
861        let msg = match packed::SyncMessageReader::from_compatible_slice(&data) {
862            Ok(msg) => {
863                let item = msg.to_enum();
864                if let packed::SyncMessageUnionReader::SendBlock(ref reader) = item {
865                    if reader.has_extra_fields() || reader.block().count_extra_fields() > 1 {
866                        info!(
867                            "A malformed message from peer {}: \
868                             excessive fields detected in SendBlock",
869                            peer_index
870                        );
871                        nc.ban_peer(
872                            peer_index,
873                            BAD_MESSAGE_BAN_TIME,
874                            String::from(
875                                "send us a malformed message: \
876                                 too many fields in SendBlock",
877                            ),
878                        );
879                        return;
880                    } else {
881                        item
882                    }
883                } else {
884                    match packed::SyncMessageReader::from_slice(&data) {
885                        Ok(msg) => msg.to_enum(),
886                        _ => {
887                            info!(
888                                "A malformed message from peer {}: \
889                                 excessive fields",
890                                peer_index
891                            );
892                            nc.ban_peer(
893                                peer_index,
894                                BAD_MESSAGE_BAN_TIME,
895                                String::from(
896                                    "send us a malformed message: \
897                                     too many fields",
898                                ),
899                            );
900                            return;
901                        }
902                    }
903                }
904            }
905            _ => {
906                info!("A malformed message from peer {}", peer_index);
907                nc.ban_peer(
908                    peer_index,
909                    BAD_MESSAGE_BAN_TIME,
910                    String::from("send us a malformed message"),
911                );
912                return;
913            }
914        };
915
916        debug!("Received msg {} from {}", msg.item_name(), peer_index);
917        #[cfg(feature = "with_sentry")]
918        {
919            let sentry_hub = sentry::Hub::current();
920            let _scope_guard = sentry_hub.push_scope();
921            sentry_hub.configure_scope(|scope| {
922                scope.set_tag("p2p.protocol", "synchronizer");
923                scope.set_tag("p2p.message", msg.item_name());
924            });
925        }
926
927        let start_time = Instant::now();
928        tokio::task::block_in_place(|| self.process(nc, peer_index, msg));
929        debug!(
930            "Process message={}, peer={}, cost={:?}",
931            msg.item_name(),
932            peer_index,
933            Instant::now().saturating_duration_since(start_time),
934        );
935    }
936
937    async fn connected(
938        &mut self,
939        nc: Arc<dyn CKBProtocolContext + Sync>,
940        peer_index: PeerIndex,
941        _version: &str,
942    ) {
943        info!("SyncProtocol.connected peer={}", peer_index);
944        self.on_connected(nc.as_ref(), peer_index);
945    }
946
947    async fn disconnected(
948        &mut self,
949        _nc: Arc<dyn CKBProtocolContext + Sync>,
950        peer_index: PeerIndex,
951    ) {
952        let sync_state = self.shared().state();
953        sync_state.disconnected(peer_index);
954        info!("SyncProtocol.disconnected peer={}", peer_index);
955    }
956
957    async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
958        if !self.peers().state.is_empty() {
959            let start_time = Instant::now();
960            trace!("Start notify token={}", token);
961            match token {
962                SEND_GET_HEADERS_TOKEN => {
963                    self.start_sync_headers(nc.as_ref());
964                }
965                IBD_BLOCK_FETCH_TOKEN => {
966                    if self.shared.active_chain().is_initial_block_download() {
967                        self.find_blocks_to_fetch(nc.as_ref(), IBDState::In);
968                    } else {
969                        {
970                            self.shared.state().write_inflight_blocks().adjustment = false;
971                        }
972                        self.shared.state().peers().clear_unknown_list();
973                        if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).await.is_err() {
974                            trace!("Ibd block fetch token removal failed");
975                        }
976                    }
977                }
978                NOT_IBD_BLOCK_FETCH_TOKEN => {
979                    if !self.shared.active_chain().is_initial_block_download() {
980                        self.find_blocks_to_fetch(nc.as_ref(), IBDState::Out);
981                    }
982                }
983                TIMEOUT_EVICTION_TOKEN => {
984                    self.eviction(nc.as_ref());
985                }
986                // Here is just for NO_PEER_CHECK_TOKEN token, only handle it when there is no peer.
987                _ => {}
988            }
989
990            trace!(
991                "Finished notify token={} cost={:?}",
992                token,
993                Instant::now().saturating_duration_since(start_time)
994            );
995        } else if token == NO_PEER_CHECK_TOKEN {
996            debug!("No peers connected");
997        }
998    }
999}