ckb_sync/synchronizer/
mod.rs

1//! CKB node has initial block download phase (IBD mode) like Bitcoin:
2//! <https://btcinformation.org/en/glossary/initial-block-download>
3//!
4//! When CKB node is in IBD mode, it will respond `packed::InIBD` to `GetHeaders` and `GetBlocks` requests
5//!
6//! And CKB has a headers-first synchronization style like Bitcoin:
7//! <https://btcinformation.org/en/glossary/headers-first-sync>
8//!
9mod block_fetcher;
10mod block_process;
11mod get_blocks_process;
12mod get_headers_process;
13mod headers_process;
14mod in_ibd_process;
15
16pub(crate) use self::block_fetcher::BlockFetcher;
17pub(crate) use self::block_process::BlockProcess;
18pub(crate) use self::get_blocks_process::GetBlocksProcess;
19pub(crate) use self::get_headers_process::GetHeadersProcess;
20pub(crate) use self::headers_process::HeadersProcess;
21pub(crate) use self::in_ibd_process::InIBDProcess;
22
23use crate::types::{post_sync_process, HeadersSyncController, IBDState, Peers, SyncShared};
24use crate::utils::{metric_ckb_message_bytes, send_message_to, MetricDirection};
25use crate::{Status, StatusCode};
26use ckb_shared::block_status::BlockStatus;
27
28use ckb_chain::{ChainController, RemoteBlock};
29use ckb_channel as channel;
30use ckb_channel::{select, Receiver};
31use ckb_constant::sync::{
32    BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME,
33    INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE,
34};
35use ckb_logger::{debug, error, info, trace, warn};
36use ckb_metrics::HistogramTimer;
37use ckb_network::{
38    async_trait, bytes::Bytes, tokio, CKBProtocolContext, CKBProtocolHandler, PeerIndex,
39    ServiceControl, SupportProtocols,
40};
41use ckb_shared::types::HeaderIndexView;
42use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
43use ckb_systemtime::unix_time_as_millis;
44
45#[cfg(test)]
46use ckb_types::core;
47use ckb_types::{
48    core::BlockNumber,
49    packed::{self, Byte32},
50    prelude::*,
51};
52use std::{
53    collections::HashSet,
54    sync::{atomic::Ordering, Arc},
55    time::{Duration, Instant},
56};
57
58pub const SEND_GET_HEADERS_TOKEN: u64 = 0;
59pub const IBD_BLOCK_FETCH_TOKEN: u64 = 1;
60pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
61pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
62pub const NO_PEER_CHECK_TOKEN: u64 = 255;
63
64const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1);
65const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
66const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);
67
68#[derive(Copy, Clone)]
69enum CanStart {
70    FetchToTarget(BlockNumber),
71    Ready,
72    MinWorkNotReach,
73    AssumeValidNotFound,
74}
75
76struct FetchCMD {
77    peers: Vec<PeerIndex>,
78    ibd_state: IBDState,
79}
80
81struct BlockFetchCMD {
82    sync_shared: Arc<SyncShared>,
83    p2p_control: ServiceControl,
84    recv: channel::Receiver<FetchCMD>,
85    can_start: CanStart,
86    number: BlockNumber,
87    start_timestamp: u64,
88}
89
90impl BlockFetchCMD {
91    fn process_fetch_cmd(&mut self, cmd: FetchCMD) {
92        let FetchCMD { peers, ibd_state }: FetchCMD = cmd;
93
94        let fetch_blocks_fn = |cmd: &mut BlockFetchCMD, assume_target: BlockNumber| {
95            for peer in peers {
96                if ckb_stop_handler::has_received_stop_signal() {
97                    return;
98                }
99
100                let mut fetch_end: BlockNumber = u64::MAX;
101                if assume_target != 0 {
102                    fetch_end = assume_target
103                }
104
105                if let Some(fetch) =
106                    BlockFetcher::new(Arc::clone(&cmd.sync_shared), peer, ibd_state)
107                        .fetch(fetch_end)
108                {
109                    for item in fetch {
110                        if ckb_stop_handler::has_received_stop_signal() {
111                            return;
112                        }
113                        BlockFetchCMD::send_getblocks(item, &cmd.p2p_control, peer);
114                    }
115                }
116            }
117        };
118
119        match self.can_start() {
120            CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target),
121            CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX),
122            CanStart::MinWorkNotReach => {
123                let best_known = self.sync_shared.state().shared_best_header_ref();
124                let number = best_known.number();
125                if number != self.number && (number - self.number) % 10000 == 0 {
126                    self.number = number;
127                    info!(
128                            "The current best known header number: {}, total difficulty: {:#x}. \
129                                 Block download minimum requirements: header number: 500_000, total difficulty: {:#x}.",
130                            number,
131                            best_known.total_difficulty(),
132                            self.sync_shared.state().min_chain_work()
133                        );
134                }
135            }
136            CanStart::AssumeValidNotFound => {
137                let state = self.sync_shared.state();
138                let shared = self.sync_shared.shared();
139                let best_known = state.shared_best_header_ref();
140                let number = best_known.number();
141                let assume_valid_target: Byte32 = shared
142                    .assume_valid_targets()
143                    .as_ref()
144                    .and_then(|targets| targets.first())
145                    .map(Pack::pack)
146                    .expect("assume valid target must exist");
147
148                if number != self.number && (number - self.number) % 10000 == 0 {
149                    self.number = number;
150                    let remaining_headers_sync_log = self.reaming_headers_sync_log();
151
152                    info!(
153                        "best known header {}-{}, \
154                                 CKB is syncing to latest Header to find the assume valid target: {}. \
155                                 Please wait. {}",
156                        number,
157                        best_known.hash(),
158                        assume_valid_target,
159                        remaining_headers_sync_log
160                    );
161                }
162            }
163        }
164    }
165
166    fn reaming_headers_sync_log(&self) -> String {
167        if let Some(remaining_headers_needed) = self.calc_time_need_to_reach_latest_tip_header() {
168            format!(
169                "Need {} minutes to sync to the latest Header.",
170                remaining_headers_needed.as_secs() / 60
171            )
172        } else {
173            "".to_string()
174        }
175    }
176
177    // Timeline:
178    //
179    // |-------------------|--------------------------------|------------|---->
180    // Genesis  (shared best timestamp)                     |           now
181    // |                   |                                |            |
182    // |             (Sync point)                  (CKB process start)   |
183    // |                   |                                             |
184    // |--Synced Part------|------------ Remain to Sync -----------------|
185    // |                                                                 |
186    // |------------------- CKB Chain Age -------------------------------|
187    //
188    fn calc_time_need_to_reach_latest_tip_header(&self) -> Option<Duration> {
189        let genesis_timestamp = self
190            .sync_shared
191            .consensus()
192            .genesis_block()
193            .header()
194            .timestamp();
195        let shared_best_timestamp = self.sync_shared.state().shared_best_header().timestamp();
196
197        let ckb_process_start_timestamp = self.start_timestamp;
198
199        let now_timestamp = unix_time_as_millis();
200
201        let ckb_chain_age = now_timestamp.checked_sub(genesis_timestamp)?;
202
203        let ckb_process_age = now_timestamp.checked_sub(ckb_process_start_timestamp)?;
204
205        let has_synced_headers_age = shared_best_timestamp.checked_sub(genesis_timestamp)?;
206
207        let ckb_sync_header_speed = has_synced_headers_age.checked_div(ckb_process_age)?;
208
209        let sync_all_headers_timecost = ckb_chain_age.checked_div(ckb_sync_header_speed)?;
210
211        let sync_remaining_headers_needed =
212            sync_all_headers_timecost.checked_sub(ckb_process_age)?;
213
214        Some(Duration::from_millis(sync_remaining_headers_needed))
215    }
216
217    fn run(&mut self, stop_signal: Receiver<()>) {
218        loop {
219            select! {
220                recv(self.recv) -> msg => {
221                    if let Ok(cmd) = msg {
222                        self.process_fetch_cmd(cmd)
223                    }
224                }
225                recv(stop_signal) -> _ => {
226                    info!("BlockDownload received exit signal, exit now");
227                    return;
228                }
229            }
230        }
231    }
232
233    fn can_start(&mut self) -> CanStart {
234        if let CanStart::Ready = self.can_start {
235            return self.can_start;
236        }
237
238        let shared = self.sync_shared.shared();
239        let state = self.sync_shared.state();
240
241        let min_work_reach = |flag: &mut CanStart| {
242            if state.min_chain_work_ready() {
243                *flag = CanStart::AssumeValidNotFound;
244            }
245        };
246
247        let assume_valid_target_find = |flag: &mut CanStart| {
248            let mut assume_valid_targets = shared.assume_valid_targets();
249            if let Some(ref targets) = *assume_valid_targets {
250                if targets.is_empty() {
251                    assume_valid_targets.take();
252                    *flag = CanStart::Ready;
253                    return;
254                }
255                let first_target = targets
256                    .first()
257                    .expect("has checked targets is not empty, assume valid target must exist");
258                match shared.header_map().get(&first_target.pack()) {
259                    Some(header) => {
260                        if matches!(*flag, CanStart::FetchToTarget(fetch_target) if fetch_target == header.number())
261                        {
262                            // BlockFetchCMD has set the fetch target, no need to set it again
263                        } else {
264                            *flag = CanStart::FetchToTarget(header.number());
265                            info!("assume valid target found in header_map; CKB will start fetch blocks to {:?} now", header.number_and_hash());
266                        }
267                        // Blocks that are no longer in the scope of ibd must be forced to verify
268                        if unix_time_as_millis().saturating_sub(header.timestamp()) < MAX_TIP_AGE {
269                            assume_valid_targets.take();
270                            warn!("the duration gap between 'assume valid target' and 'now' is less than 24h; CKB will ignore the specified assume valid target and do full verification from now on");
271                        }
272                    }
273                    None => {
274                        // Best known already not in the scope of ibd, it means target is invalid
275                        if unix_time_as_millis()
276                            .saturating_sub(state.shared_best_header_ref().timestamp())
277                            < MAX_TIP_AGE
278                        {
279                            warn!("the duration gap between 'shared_best_header' and 'now' is less than 24h, but CKB haven't found the assume valid target in header_map; CKB will ignore the specified assume valid target and do full verification from now on");
280                            *flag = CanStart::Ready;
281                            assume_valid_targets.take();
282                        }
283                    }
284                }
285            } else {
286                *flag = CanStart::Ready;
287            }
288        };
289
290        match self.can_start {
291            CanStart::FetchToTarget(_) => {
292                assume_valid_target_find(&mut self.can_start);
293                self.can_start
294            }
295            CanStart::Ready => self.can_start,
296            CanStart::MinWorkNotReach => {
297                min_work_reach(&mut self.can_start);
298                if let CanStart::AssumeValidNotFound = self.can_start {
299                    assume_valid_target_find(&mut self.can_start);
300                }
301                self.can_start
302            }
303            CanStart::AssumeValidNotFound => {
304                assume_valid_target_find(&mut self.can_start);
305                self.can_start
306            }
307        }
308    }
309
310    fn send_getblocks(v_fetch: Vec<packed::Byte32>, nc: &ServiceControl, peer: PeerIndex) {
311        let content = packed::GetBlocks::new_builder()
312            .block_hashes(v_fetch.clone().pack())
313            .build();
314        let message = packed::SyncMessage::new_builder().set(content).build();
315
316        debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
317        if let Err(err) = nc.send_message_to(
318            peer,
319            SupportProtocols::Sync.protocol_id(),
320            message.as_bytes(),
321        ) {
322            debug!("synchronizer sending GetBlocks error: {:?}", err);
323        }
324    }
325}
326
327/// Sync protocol handle
328pub struct Synchronizer {
329    pub(crate) chain: ChainController,
330    /// Sync shared state
331    pub shared: Arc<SyncShared>,
332    fetch_channel: Option<channel::Sender<FetchCMD>>,
333}
334
335impl Synchronizer {
336    /// Init sync protocol handle
337    ///
338    /// This is a runtime sync protocol shared state, and any Sync protocol messages will be processed and forwarded by it
339    pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
340        Synchronizer {
341            chain,
342            shared,
343            fetch_channel: None,
344        }
345    }
346
347    /// Get shared state
348    pub fn shared(&self) -> &Arc<SyncShared> {
349        &self.shared
350    }
351
352    fn try_process(
353        &self,
354        nc: Arc<dyn CKBProtocolContext + Sync>,
355        peer: PeerIndex,
356        message: packed::SyncMessageUnionReader<'_>,
357    ) -> Status {
358        let _trace_timecost: Option<HistogramTimer> = {
359            ckb_metrics::handle().map(|handle| {
360                handle
361                    .ckb_sync_msg_process_duration
362                    .with_label_values(&[message.item_name()])
363                    .start_timer()
364            })
365        };
366
367        match message {
368            packed::SyncMessageUnionReader::GetHeaders(reader) => {
369                GetHeadersProcess::new(reader, self, peer, nc.as_ref()).execute()
370            }
371            packed::SyncMessageUnionReader::SendHeaders(reader) => {
372                HeadersProcess::new(reader, self, peer, nc.as_ref()).execute()
373            }
374            packed::SyncMessageUnionReader::GetBlocks(reader) => {
375                GetBlocksProcess::new(reader, self, peer, nc.as_ref()).execute()
376            }
377            packed::SyncMessageUnionReader::SendBlock(reader) => {
378                if reader.check_data() {
379                    BlockProcess::new(reader, self, peer, nc).execute()
380                } else {
381                    StatusCode::ProtocolMessageIsMalformed.with_context("SendBlock is invalid")
382                }
383            }
384            packed::SyncMessageUnionReader::InIBD(_) => {
385                InIBDProcess::new(self, peer, nc.as_ref()).execute()
386            }
387        }
388    }
389
390    fn process(
391        &self,
392        nc: Arc<dyn CKBProtocolContext + Sync>,
393        peer: PeerIndex,
394        message: packed::SyncMessageUnionReader<'_>,
395    ) {
396        let item_name = message.item_name();
397        let item_bytes = message.as_slice().len() as u64;
398        let status = self.try_process(Arc::clone(&nc), peer, message);
399
400        metric_ckb_message_bytes(
401            MetricDirection::In,
402            &SupportProtocols::Sync.name(),
403            item_name,
404            Some(status.code()),
405            item_bytes,
406        );
407
408        post_sync_process(nc.as_ref(), peer, item_name, status);
409    }
410
411    /// Get peers info
412    pub fn peers(&self) -> &Peers {
413        self.shared().state().peers()
414    }
415
416    fn better_tip_header(&self) -> HeaderIndexView {
417        let (header, total_difficulty) = {
418            let active_chain = self.shared.active_chain();
419            (
420                active_chain.tip_header(),
421                active_chain.total_difficulty().to_owned(),
422            )
423        };
424        let best_known = self.shared.state().shared_best_header();
425        // is_better_chain
426        if total_difficulty > *best_known.total_difficulty() {
427            (header, total_difficulty).into()
428        } else {
429            best_known
430        }
431    }
432
433    /// Process a new block sync from other peer
434    //TODO: process block which we don't request
435    pub fn asynchronous_process_remote_block(&self, remote_block: RemoteBlock) {
436        let block_hash = remote_block.block.hash();
437        let status = self.shared.active_chain().get_block_status(&block_hash);
438        // NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
439        // stopping synchronization even when orphan_pool maintains dirty items by bugs.
440        if status.contains(BlockStatus::BLOCK_STORED) {
441            error!("Block {} already stored", block_hash);
442        } else if status.contains(BlockStatus::HEADER_VALID) {
443            self.shared.accept_remote_block(&self.chain, remote_block);
444        } else {
445            debug!(
446                "Synchronizer process_new_block unexpected status {:?} {}",
447                status, block_hash,
448            );
449            // TODO which error should we return?
450        }
451    }
452
453    #[cfg(test)]
454    pub fn blocking_process_new_block(
455        &self,
456        block: core::BlockView,
457        _peer_id: PeerIndex,
458    ) -> Result<bool, ckb_error::Error> {
459        let block_hash = block.hash();
460        let status = self.shared.active_chain().get_block_status(&block_hash);
461        // NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
462        // stopping synchronization even when orphan_pool maintains dirty items by bugs.
463        if status.contains(BlockStatus::BLOCK_STORED) {
464            error!("block {} already stored", block_hash);
465            Ok(false)
466        } else if status.contains(BlockStatus::HEADER_VALID) {
467            self.chain.blocking_process_block(Arc::new(block))
468        } else {
469            debug!(
470                "Synchronizer process_new_block unexpected status {:?} {}",
471                status, block_hash,
472            );
473            // TODO while error should we return?
474            Ok(false)
475        }
476    }
477
478    /// Get blocks to fetch
479    pub fn get_blocks_to_fetch(
480        &self,
481        peer: PeerIndex,
482        ibd: IBDState,
483    ) -> Option<Vec<Vec<packed::Byte32>>> {
484        BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch(BlockNumber::MAX)
485    }
486
487    pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
488        let pid = SupportProtocols::Sync.protocol_id();
489        let (is_outbound, is_whitelist, is_2023edition) = nc
490            .get_peer(peer)
491            .map(|peer| {
492                (
493                    peer.is_outbound(),
494                    peer.is_whitelist,
495                    peer.protocols.get(&pid).map(|v| v == "3").unwrap_or(false),
496                )
497            })
498            .unwrap_or((false, false, false));
499
500        self.peers()
501            .sync_connected(peer, is_outbound, is_whitelist, is_2023edition);
502    }
503
504    /// Regularly check and eject some nodes that do not respond in time
505    //   - If at timeout their best known block now has more work than our tip
506    //     when the timeout was set, then either reset the timeout or clear it
507    //     (after comparing against our current tip's work)
508    //   - If at timeout their best known block still has less work than our
509    //     tip did when the timeout was set, then send a getheaders message,
510    //     and set a shorter timeout, HEADERS_RESPONSE_TIME seconds in future.
511    //     If their best known block is still behind when that new timeout is
512    //     reached, disconnect.
513    pub fn eviction(&self, nc: &dyn CKBProtocolContext) {
514        let active_chain = self.shared.active_chain();
515        let mut eviction = Vec::new();
516        let better_tip_header = self.better_tip_header();
517        for mut kv_pair in self.peers().state.iter_mut() {
518            let (peer, state) = kv_pair.pair_mut();
519            let now = unix_time_as_millis();
520
521            if let Some(ref mut controller) = state.headers_sync_controller {
522                let better_tip_ts = better_tip_header.timestamp();
523                if let Some(is_timeout) = controller.is_timeout(better_tip_ts, now) {
524                    if is_timeout {
525                        eviction.push(*peer);
526                        continue;
527                    }
528                } else {
529                    active_chain.send_getheaders_to_peer(
530                        nc,
531                        *peer,
532                        better_tip_header.number_and_hash(),
533                    );
534                }
535            }
536
537            // On ibd, node should only have one peer to sync headers, and it's state can control by
538            // headers_sync_controller.
539            //
540            // The header sync of other nodes does not matter in the ibd phase, and parallel synchronization
541            // can be enabled by unknown list, so there is no need to repeatedly download headers with
542            // multiple nodes at the same time.
543            if active_chain.is_initial_block_download() {
544                continue;
545            }
546            if state.peer_flags.is_outbound {
547                let best_known_header = state.best_known_header.as_ref();
548                let (tip_header, local_total_difficulty) = {
549                    (
550                        active_chain.tip_header().to_owned(),
551                        active_chain.total_difficulty().to_owned(),
552                    )
553                };
554                if best_known_header
555                    .map(|header_index| header_index.total_difficulty().clone())
556                    .unwrap_or_default()
557                    >= local_total_difficulty
558                {
559                    if state.chain_sync.timeout != 0 {
560                        state.chain_sync.timeout = 0;
561                        state.chain_sync.work_header = None;
562                        state.chain_sync.total_difficulty = None;
563                        state.chain_sync.sent_getheaders = false;
564                    }
565                } else if state.chain_sync.timeout == 0
566                    || (best_known_header.is_some()
567                        && best_known_header
568                            .map(|header_index| header_index.total_difficulty().clone())
569                            >= state.chain_sync.total_difficulty)
570                {
571                    // Our best block known by this peer is behind our tip, and we're either noticing
572                    // that for the first time, OR this peer was able to catch up to some earlier point
573                    // where we checked against our tip.
574                    // Either way, set a new timeout based on current tip.
575                    state.chain_sync.timeout = now + CHAIN_SYNC_TIMEOUT;
576                    state.chain_sync.work_header = Some(tip_header);
577                    state.chain_sync.total_difficulty = Some(local_total_difficulty);
578                    state.chain_sync.sent_getheaders = false;
579                } else if state.chain_sync.timeout > 0 && now > state.chain_sync.timeout {
580                    // No evidence yet that our peer has synced to a chain with work equal to that
581                    // of our tip, when we first detected it was behind. Send a single getheaders
582                    // message to give the peer a chance to update us.
583                    if state.chain_sync.sent_getheaders {
584                        if state.peer_flags.is_protect || state.peer_flags.is_whitelist {
585                            if state.sync_started() {
586                                self.shared().state().suspend_sync(state);
587                            }
588                        } else {
589                            eviction.push(*peer);
590                        }
591                    } else {
592                        state.chain_sync.sent_getheaders = true;
593                        state.chain_sync.timeout = now + EVICTION_HEADERS_RESPONSE_TIME;
594                        active_chain.send_getheaders_to_peer(
595                            nc,
596                            *peer,
597                            state
598                                .chain_sync
599                                .work_header
600                                .as_ref()
601                                .expect("work_header be assigned")
602                                .into(),
603                        );
604                    }
605                }
606            }
607        }
608        for peer in eviction {
609            info!("Timeout eviction peer={}", peer);
610            if let Err(err) = nc.disconnect(peer, "sync timeout eviction") {
611                debug!("synchronizer disconnect error: {:?}", err);
612            }
613        }
614    }
615
616    fn start_sync_headers(&self, nc: &dyn CKBProtocolContext) {
617        let now = unix_time_as_millis();
618        let active_chain = self.shared.active_chain();
619        let ibd = active_chain.is_initial_block_download();
620        let peers: Vec<PeerIndex> = self
621            .peers()
622            .state
623            .iter()
624            .filter(|kv_pair| kv_pair.value().can_start_sync(now, ibd))
625            .map(|kv_pair| *kv_pair.key())
626            .collect();
627
628        if peers.is_empty() {
629            return;
630        }
631
632        let tip = self.better_tip_header();
633
634        for peer in peers {
635            // Only sync with 1 peer if we're in IBD
636            if self
637                .shared()
638                .state()
639                .n_sync_started()
640                .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
641                    if ibd && x != 0 {
642                        None
643                    } else {
644                        Some(x + 1)
645                    }
646                })
647                .is_err()
648            {
649                break;
650            }
651            {
652                if let Some(mut peer_state) = self.peers().state.get_mut(&peer) {
653                    peer_state.start_sync(HeadersSyncController::from_header(&tip));
654                }
655            }
656
657            debug!("Start sync peer={}", peer);
658            active_chain.send_getheaders_to_peer(nc, peer, tip.number_and_hash());
659        }
660    }
661
662    fn get_peers_to_fetch(
663        &self,
664        ibd: IBDState,
665        disconnect_list: &HashSet<PeerIndex>,
666    ) -> Vec<PeerIndex> {
667        trace!("Poll find_blocks_to_fetch selecting peers");
668        let state = &self
669            .shared
670            .state()
671            .read_inflight_blocks()
672            .download_schedulers;
673        let mut peers: Vec<PeerIndex> = self
674            .peers()
675            .state
676            .iter()
677            .filter(|kv_pair| {
678                let (id, state) = kv_pair.pair();
679                if disconnect_list.contains(id) {
680                    return false;
681                };
682                match ibd {
683                    IBDState::In => {
684                        state.peer_flags.is_outbound
685                            || state.peer_flags.is_whitelist
686                            || state.peer_flags.is_protect
687                    }
688                    IBDState::Out => state.started_or_tip_synced(),
689                }
690            })
691            .map(|kv_pair| *kv_pair.key())
692            .collect();
693        peers.sort_by_key(|id| {
694            ::std::cmp::Reverse(
695                state
696                    .get(id)
697                    .map_or(INIT_BLOCKS_IN_TRANSIT_PER_PEER, |d| d.task_count()),
698            )
699        });
700        peers
701    }
702
703    fn find_blocks_to_fetch(&mut self, nc: &dyn CKBProtocolContext, ibd: IBDState) {
704        if self.chain.is_verifying_unverified_blocks_on_startup() {
705            trace!(
706                "skip find_blocks_to_fetch, ckb_chain is verifying unverified blocks on startup"
707            );
708            return;
709        }
710
711        if ckb_stop_handler::has_received_stop_signal() {
712            info!("received stop signal, stop find_blocks_to_fetch");
713            return;
714        }
715
716        let unverified_tip = self.shared.active_chain().unverified_tip_number();
717
718        let disconnect_list = {
719            let mut list = self
720                .shared()
721                .state()
722                .write_inflight_blocks()
723                .prune(unverified_tip);
724            if let IBDState::In = ibd {
725                // best known < tip and in IBD state, and unknown list is empty,
726                // these node can be disconnect
727                list.extend(
728                    self.shared
729                        .state()
730                        .peers()
731                        .get_best_known_less_than_tip_and_unknown_empty(unverified_tip),
732                )
733            };
734            list
735        };
736
737        for peer in disconnect_list.iter() {
738            // It is not forbidden to evict protected nodes:
739            // - First of all, this node is not designated by the user for protection,
740            //   but is connected randomly. It does not represent the will of the user
741            // - Secondly, in the synchronization phase, the nodes with zero download tasks are
742            //   retained, apart from reducing the download efficiency, there is no benefit.
743            if self
744                .peers()
745                .get_flag(*peer)
746                .map(|flag| flag.is_whitelist)
747                .unwrap_or(false)
748            {
749                continue;
750            }
751            if let Err(err) = nc.disconnect(*peer, "sync disconnect") {
752                debug!("synchronizer disconnect error: {:?}", err);
753            }
754        }
755
756        // fetch use a lot of cpu time, especially in ibd state
757        // so, the fetch function use another thread
758        match nc.p2p_control() {
759            Some(raw) => match self.fetch_channel {
760                Some(ref sender) => {
761                    if !sender.is_full() {
762                        let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
763                        let _ignore = sender.try_send(FetchCMD {
764                            peers,
765                            ibd_state: ibd,
766                        });
767                    }
768                }
769                None => {
770                    let p2p_control = raw.clone();
771                    let (sender, recv) = channel::bounded(2);
772                    let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
773                    sender
774                        .send(FetchCMD {
775                            peers,
776                            ibd_state: ibd,
777                        })
778                        .unwrap();
779                    self.fetch_channel = Some(sender);
780                    let thread = ::std::thread::Builder::new();
781                    let number = self.shared.state().shared_best_header_ref().number();
782                    const THREAD_NAME: &str = "BlockDownload";
783                    let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
784                    let blockdownload_jh = thread
785                        .name(THREAD_NAME.into())
786                        .spawn(move || {
787                            let stop_signal = new_crossbeam_exit_rx();
788                            BlockFetchCMD {
789                                sync_shared,
790                                p2p_control,
791                                recv,
792                                number,
793                                can_start: CanStart::MinWorkNotReach,
794                                start_timestamp: unix_time_as_millis(),
795                            }
796                            .run(stop_signal);
797                        })
798                        .expect("download thread can't start");
799                    register_thread(THREAD_NAME, blockdownload_jh);
800                }
801            },
802            None => {
803                for peer in self.get_peers_to_fetch(ibd, &disconnect_list) {
804                    if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) {
805                        for item in fetch {
806                            self.send_getblocks(item, nc, peer);
807                        }
808                    }
809                }
810            }
811        }
812    }
813
814    fn send_getblocks(
815        &self,
816        v_fetch: Vec<packed::Byte32>,
817        nc: &dyn CKBProtocolContext,
818        peer: PeerIndex,
819    ) {
820        let content = packed::GetBlocks::new_builder()
821            .block_hashes(v_fetch.clone().pack())
822            .build();
823        let message = packed::SyncMessage::new_builder().set(content).build();
824
825        debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
826        let _status = send_message_to(nc, peer, &message);
827    }
828}
829
830#[async_trait]
831impl CKBProtocolHandler for Synchronizer {
832    async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
833        // NOTE: 100ms is what bitcoin use.
834        nc.set_notify(SYNC_NOTIFY_INTERVAL, SEND_GET_HEADERS_TOKEN)
835            .await
836            .expect("set_notify at init is ok");
837        nc.set_notify(SYNC_NOTIFY_INTERVAL, TIMEOUT_EVICTION_TOKEN)
838            .await
839            .expect("set_notify at init is ok");
840        nc.set_notify(IBD_BLOCK_FETCH_INTERVAL, IBD_BLOCK_FETCH_TOKEN)
841            .await
842            .expect("set_notify at init is ok");
843        nc.set_notify(NOT_IBD_BLOCK_FETCH_INTERVAL, NOT_IBD_BLOCK_FETCH_TOKEN)
844            .await
845            .expect("set_notify at init is ok");
846        nc.set_notify(Duration::from_secs(2), NO_PEER_CHECK_TOKEN)
847            .await
848            .expect("set_notify at init is ok");
849    }
850
851    async fn received(
852        &mut self,
853        nc: Arc<dyn CKBProtocolContext + Sync>,
854        peer_index: PeerIndex,
855        data: Bytes,
856    ) {
857        let msg = match packed::SyncMessageReader::from_compatible_slice(&data) {
858            Ok(msg) => {
859                let item = msg.to_enum();
860                if let packed::SyncMessageUnionReader::SendBlock(ref reader) = item {
861                    if reader.has_extra_fields() || reader.block().count_extra_fields() > 1 {
862                        info!(
863                            "A malformed message from peer {}: \
864                             excessive fields detected in SendBlock",
865                            peer_index
866                        );
867                        nc.ban_peer(
868                            peer_index,
869                            BAD_MESSAGE_BAN_TIME,
870                            String::from(
871                                "send us a malformed message: \
872                                 too many fields in SendBlock",
873                            ),
874                        );
875                        return;
876                    } else {
877                        item
878                    }
879                } else {
880                    match packed::SyncMessageReader::from_slice(&data) {
881                        Ok(msg) => msg.to_enum(),
882                        _ => {
883                            info!(
884                                "A malformed message from peer {}: \
885                                 excessive fields",
886                                peer_index
887                            );
888                            nc.ban_peer(
889                                peer_index,
890                                BAD_MESSAGE_BAN_TIME,
891                                String::from(
892                                    "send us a malformed message: \
893                                     too many fields",
894                                ),
895                            );
896                            return;
897                        }
898                    }
899                }
900            }
901            _ => {
902                info!("A malformed message from peer {}", peer_index);
903                nc.ban_peer(
904                    peer_index,
905                    BAD_MESSAGE_BAN_TIME,
906                    String::from("send us a malformed message"),
907                );
908                return;
909            }
910        };
911
912        debug!("Received msg {} from {}", msg.item_name(), peer_index);
913        #[cfg(feature = "with_sentry")]
914        {
915            let sentry_hub = sentry::Hub::current();
916            let _scope_guard = sentry_hub.push_scope();
917            sentry_hub.configure_scope(|scope| {
918                scope.set_tag("p2p.protocol", "synchronizer");
919                scope.set_tag("p2p.message", msg.item_name());
920            });
921        }
922
923        let start_time = Instant::now();
924        tokio::task::block_in_place(|| self.process(nc, peer_index, msg));
925        debug!(
926            "Process message={}, peer={}, cost={:?}",
927            msg.item_name(),
928            peer_index,
929            Instant::now().saturating_duration_since(start_time),
930        );
931    }
932
933    async fn connected(
934        &mut self,
935        nc: Arc<dyn CKBProtocolContext + Sync>,
936        peer_index: PeerIndex,
937        _version: &str,
938    ) {
939        info!("SyncProtocol.connected peer={}", peer_index);
940        self.on_connected(nc.as_ref(), peer_index);
941    }
942
943    async fn disconnected(
944        &mut self,
945        _nc: Arc<dyn CKBProtocolContext + Sync>,
946        peer_index: PeerIndex,
947    ) {
948        let sync_state = self.shared().state();
949        sync_state.disconnected(peer_index);
950        info!("SyncProtocol.disconnected peer={}", peer_index);
951    }
952
953    async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
954        if !self.peers().state.is_empty() {
955            let start_time = Instant::now();
956            trace!("Start notify token={}", token);
957            match token {
958                SEND_GET_HEADERS_TOKEN => {
959                    self.start_sync_headers(nc.as_ref());
960                }
961                IBD_BLOCK_FETCH_TOKEN => {
962                    if self.shared.active_chain().is_initial_block_download() {
963                        self.find_blocks_to_fetch(nc.as_ref(), IBDState::In);
964                    } else {
965                        {
966                            self.shared.state().write_inflight_blocks().adjustment = false;
967                        }
968                        self.shared.state().peers().clear_unknown_list();
969                        if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).await.is_err() {
970                            trace!("Ibd block fetch token removal failed");
971                        }
972                    }
973                }
974                NOT_IBD_BLOCK_FETCH_TOKEN => {
975                    if !self.shared.active_chain().is_initial_block_download() {
976                        self.find_blocks_to_fetch(nc.as_ref(), IBDState::Out);
977                    }
978                }
979                TIMEOUT_EVICTION_TOKEN => {
980                    self.eviction(nc.as_ref());
981                }
982                // Here is just for NO_PEER_CHECK_TOKEN token, only handle it when there is no peer.
983                _ => {}
984            }
985
986            trace!(
987                "Finished notify token={} cost={:?}",
988                token,
989                Instant::now().saturating_duration_since(start_time)
990            );
991        } else if token == NO_PEER_CHECK_TOKEN {
992            debug!("No peers connected");
993        }
994    }
995}