ckb_sync/synchronizer/
mod.rs

1//! CKB node has initial block download phase (IBD mode) like Bitcoin:
2//! <https://btcinformation.org/en/glossary/initial-block-download>
3//!
4//! When CKB node is in IBD mode, it will respond `packed::InIBD` to `GetHeaders` and `GetBlocks` requests
5//!
6//! And CKB has a headers-first synchronization style like Bitcoin:
7//! <https://btcinformation.org/en/glossary/headers-first-sync>
8//!
9mod block_fetcher;
10mod block_process;
11mod get_blocks_process;
12mod get_headers_process;
13mod headers_process;
14mod in_ibd_process;
15
16pub(crate) use self::block_fetcher::BlockFetcher;
17pub(crate) use self::block_process::BlockProcess;
18pub(crate) use self::get_blocks_process::GetBlocksProcess;
19pub(crate) use self::get_headers_process::GetHeadersProcess;
20pub(crate) use self::headers_process::HeadersProcess;
21pub(crate) use self::in_ibd_process::InIBDProcess;
22
23use crate::types::{HeadersSyncController, IBDState, Peers, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, async_send_message_to, metric_ckb_message_bytes};
25use crate::{Status, StatusCode};
26use ckb_shared::block_status::BlockStatus;
27
28use ckb_chain::{ChainController, RemoteBlock};
29use ckb_channel as channel;
30use ckb_channel::{Receiver, select};
31use ckb_constant::sync::{
32    BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME,
33    INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE,
34};
35use ckb_logger::{debug, error, info, trace, warn};
36use ckb_metrics::HistogramTimer;
37use ckb_network::{
38    CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceAsyncControl, ServiceControl,
39    SupportProtocols, async_trait, bytes::Bytes, tokio,
40};
41use ckb_shared::types::HeaderIndexView;
42use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
43use ckb_systemtime::unix_time_as_millis;
44
45#[cfg(test)]
46use ckb_types::core;
47use ckb_types::{
48    core::BlockNumber,
49    packed::{self, Byte32},
50    prelude::*,
51};
52use std::{
53    collections::HashSet,
54    sync::{Arc, atomic::Ordering},
55    time::{Duration, Instant},
56};
57
58pub const SEND_GET_HEADERS_TOKEN: u64 = 0;
59pub const IBD_BLOCK_FETCH_TOKEN: u64 = 1;
60pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
61pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
62pub const NO_PEER_CHECK_TOKEN: u64 = 255;
63
64const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1);
65const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
66const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);
67
68#[derive(Copy, Clone)]
69enum CanStart {
70    FetchToTarget(BlockNumber),
71    Ready,
72    MinWorkNotReach,
73    AssumeValidNotFound,
74}
75
76struct FetchCMD {
77    peers: Vec<PeerIndex>,
78    ibd_state: IBDState,
79}
80
81struct BlockFetchCMD {
82    sync_shared: Arc<SyncShared>,
83    p2p_control: ServiceControl,
84    recv: channel::Receiver<FetchCMD>,
85    can_start: CanStart,
86    number: BlockNumber,
87    start_timestamp: u64,
88}
89
90impl BlockFetchCMD {
91    fn process_fetch_cmd(&mut self, cmd: FetchCMD) {
92        let FetchCMD { peers, ibd_state }: FetchCMD = cmd;
93
94        let fetch_blocks_fn = |cmd: &mut BlockFetchCMD, assume_target: BlockNumber| {
95            for peer in peers {
96                if ckb_stop_handler::has_received_stop_signal() {
97                    return;
98                }
99
100                let mut fetch_end: BlockNumber = u64::MAX;
101                if assume_target != 0 {
102                    fetch_end = assume_target
103                }
104
105                if let Some(fetch) =
106                    BlockFetcher::new(Arc::clone(&cmd.sync_shared), peer, ibd_state)
107                        .fetch(fetch_end)
108                {
109                    for item in fetch {
110                        if ckb_stop_handler::has_received_stop_signal() {
111                            return;
112                        }
113                        let ctrl = cmd.p2p_control.clone();
114                        let handle = cmd.sync_shared.shared().async_handle();
115                        handle.spawn(BlockFetchCMD::send_getblocks(item, ctrl, peer));
116                    }
117                }
118            }
119        };
120
121        match self.can_start() {
122            CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target),
123            CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX),
124            CanStart::MinWorkNotReach => {
125                let best_known = self.sync_shared.state().shared_best_header_ref();
126                let number = best_known.number();
127                if number != self.number && (number - self.number) % 10000 == 0 {
128                    self.number = number;
129                    info!(
130                        "The current best known header number: {}, total difficulty: {:#x}. \
131                                 Block download minimum requirements: header number: 500_000, total difficulty: {:#x}.",
132                        number,
133                        best_known.total_difficulty(),
134                        self.sync_shared.state().min_chain_work()
135                    );
136                }
137            }
138            CanStart::AssumeValidNotFound => {
139                let state = self.sync_shared.state();
140                let shared = self.sync_shared.shared();
141                let best_known = state.shared_best_header_ref();
142                let number = best_known.number();
143                let assume_valid_target: Byte32 = shared
144                    .assume_valid_targets()
145                    .as_ref()
146                    .and_then(|targets| targets.first())
147                    .map(Pack::pack)
148                    .expect("assume valid target must exist");
149
150                if number != self.number && (number - self.number) % 10000 == 0 {
151                    self.number = number;
152                    let remaining_headers_sync_log = self.reaming_headers_sync_log();
153
154                    info!(
155                        "best known header {}-{}, \
156                                 CKB is syncing to latest Header to find the assume valid target: {}. \
157                                 Please wait. {}",
158                        number,
159                        best_known.hash(),
160                        assume_valid_target,
161                        remaining_headers_sync_log
162                    );
163                }
164            }
165        }
166    }
167
168    fn reaming_headers_sync_log(&self) -> String {
169        if let Some(remaining_headers_needed) = self.calc_time_need_to_reach_latest_tip_header() {
170            format!(
171                "Need {} minutes to sync to the latest Header.",
172                remaining_headers_needed.as_secs() / 60
173            )
174        } else {
175            "".to_string()
176        }
177    }
178
179    // Timeline:
180    //
181    // |-------------------|--------------------------------|------------|---->
182    // Genesis  (shared best timestamp)                     |           now
183    // |                   |                                |            |
184    // |             (Sync point)                  (CKB process start)   |
185    // |                   |                                             |
186    // |--Synced Part------|------------ Remain to Sync -----------------|
187    // |                                                                 |
188    // |------------------- CKB Chain Age -------------------------------|
189    //
190    fn calc_time_need_to_reach_latest_tip_header(&self) -> Option<Duration> {
191        let genesis_timestamp = self
192            .sync_shared
193            .consensus()
194            .genesis_block()
195            .header()
196            .timestamp();
197        let shared_best_timestamp = self.sync_shared.state().shared_best_header().timestamp();
198
199        let ckb_process_start_timestamp = self.start_timestamp;
200
201        let now_timestamp = unix_time_as_millis();
202
203        let ckb_chain_age = now_timestamp.checked_sub(genesis_timestamp)?;
204
205        let ckb_process_age = now_timestamp.checked_sub(ckb_process_start_timestamp)?;
206
207        let has_synced_headers_age = shared_best_timestamp.checked_sub(genesis_timestamp)?;
208
209        let ckb_sync_header_speed = has_synced_headers_age.checked_div(ckb_process_age)?;
210
211        let sync_all_headers_timecost = ckb_chain_age.checked_div(ckb_sync_header_speed)?;
212
213        let sync_remaining_headers_needed =
214            sync_all_headers_timecost.checked_sub(ckb_process_age)?;
215
216        Some(Duration::from_millis(sync_remaining_headers_needed))
217    }
218
219    fn run(&mut self, stop_signal: Receiver<()>) {
220        loop {
221            select! {
222                recv(self.recv) -> msg => {
223                    if let Ok(cmd) = msg {
224                        self.process_fetch_cmd(cmd)
225                    }
226                }
227                recv(stop_signal) -> _ => {
228                    info!("BlockDownload received exit signal, exit now");
229                    return;
230                }
231            }
232        }
233    }
234
235    fn can_start(&mut self) -> CanStart {
236        if let CanStart::Ready = self.can_start {
237            return self.can_start;
238        }
239
240        let shared = self.sync_shared.shared();
241        let state = self.sync_shared.state();
242
243        let min_work_reach = |flag: &mut CanStart| {
244            if state.min_chain_work_ready() {
245                *flag = CanStart::AssumeValidNotFound;
246            }
247        };
248
249        let assume_valid_target_find = |flag: &mut CanStart| {
250            let mut assume_valid_targets = shared.assume_valid_targets();
251            if let Some(ref targets) = *assume_valid_targets {
252                if targets.is_empty() {
253                    assume_valid_targets.take();
254                    *flag = CanStart::Ready;
255                    return;
256                }
257                let first_target = targets
258                    .first()
259                    .expect("has checked targets is not empty, assume valid target must exist");
260                match shared.header_map().get(&first_target.into()) {
261                    Some(header) => {
262                        if matches!(*flag, CanStart::FetchToTarget(fetch_target) if fetch_target == header.number())
263                        {
264                            // BlockFetchCMD has set the fetch target, no need to set it again
265                        } else {
266                            *flag = CanStart::FetchToTarget(header.number());
267                            info!(
268                                "assume valid target found in header_map; CKB will start fetch blocks to {:?} now",
269                                header.number_and_hash()
270                            );
271                        }
272                        // Blocks that are no longer in the scope of ibd must be forced to verify
273                        if unix_time_as_millis().saturating_sub(header.timestamp()) < MAX_TIP_AGE {
274                            assume_valid_targets.take();
275                            warn!(
276                                "the duration gap between 'assume valid target' and 'now' is less than 24h; CKB will ignore the specified assume valid target and do full verification from now on"
277                            );
278                        }
279                    }
280                    None => {
281                        // Best known already not in the scope of ibd, it means target is invalid
282                        if unix_time_as_millis()
283                            .saturating_sub(state.shared_best_header_ref().timestamp())
284                            < MAX_TIP_AGE
285                        {
286                            warn!(
287                                "the duration gap between 'shared_best_header' and 'now' is less than 24h, but CKB haven't found the assume valid target in header_map; CKB will ignore the specified assume valid target and do full verification from now on"
288                            );
289                            *flag = CanStart::Ready;
290                            assume_valid_targets.take();
291                        }
292                    }
293                }
294            } else {
295                *flag = CanStart::Ready;
296            }
297        };
298
299        match self.can_start {
300            CanStart::FetchToTarget(_) => {
301                assume_valid_target_find(&mut self.can_start);
302                self.can_start
303            }
304            CanStart::Ready => self.can_start,
305            CanStart::MinWorkNotReach => {
306                min_work_reach(&mut self.can_start);
307                if let CanStart::AssumeValidNotFound = self.can_start {
308                    assume_valid_target_find(&mut self.can_start);
309                }
310                self.can_start
311            }
312            CanStart::AssumeValidNotFound => {
313                assume_valid_target_find(&mut self.can_start);
314                self.can_start
315            }
316        }
317    }
318
319    async fn send_getblocks(v_fetch: Vec<packed::Byte32>, nc: ServiceControl, peer: PeerIndex) {
320        let content = packed::GetBlocks::new_builder()
321            .block_hashes(v_fetch.clone())
322            .build();
323        let message = packed::SyncMessage::new_builder().set(content).build();
324
325        debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
326        if let Err(err) = Into::<ServiceAsyncControl>::into(nc)
327            .send_message_to(
328                peer,
329                SupportProtocols::Sync.protocol_id(),
330                message.as_bytes(),
331            )
332            .await
333        {
334            debug!("synchronizer sending GetBlocks error: {:?}", err);
335        }
336    }
337}
338
339/// Sync protocol handle
340pub struct Synchronizer {
341    pub(crate) chain: ChainController,
342    /// Sync shared state
343    pub shared: Arc<SyncShared>,
344    fetch_channel: Option<channel::Sender<FetchCMD>>,
345}
346
347impl Synchronizer {
348    /// Init sync protocol handle
349    ///
350    /// This is a runtime sync protocol shared state, and any Sync protocol messages will be processed and forwarded by it
351    pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
352        Synchronizer {
353            chain,
354            shared,
355            fetch_channel: None,
356        }
357    }
358
359    /// Get shared state
360    pub fn shared(&self) -> &Arc<SyncShared> {
361        &self.shared
362    }
363
364    async fn try_process(
365        &self,
366        nc: Arc<dyn CKBProtocolContext + Sync>,
367        peer: PeerIndex,
368        message: packed::SyncMessageUnionReader<'_>,
369    ) -> Status {
370        let _trace_timecost: Option<HistogramTimer> = {
371            ckb_metrics::handle().map(|handle| {
372                handle
373                    .ckb_sync_msg_process_duration
374                    .with_label_values(&[message.item_name()])
375                    .start_timer()
376            })
377        };
378
379        match message {
380            packed::SyncMessageUnionReader::GetHeaders(reader) => {
381                tokio::task::block_in_place(|| {
382                    GetHeadersProcess::new(reader, self, peer, &nc).execute()
383                })
384            }
385            packed::SyncMessageUnionReader::SendHeaders(reader) => {
386                tokio::task::block_in_place(|| {
387                    HeadersProcess::new(reader, self, peer, &nc).execute()
388                })
389            }
390            packed::SyncMessageUnionReader::GetBlocks(reader) => {
391                tokio::task::block_in_place(|| {
392                    GetBlocksProcess::new(reader, self, peer, &nc).execute()
393                })
394            }
395            packed::SyncMessageUnionReader::SendBlock(reader) => {
396                if reader.check_data() {
397                    BlockProcess::new(reader, self, peer, nc).execute()
398                } else {
399                    StatusCode::ProtocolMessageIsMalformed.with_context("SendBlock is invalid")
400                }
401            }
402            packed::SyncMessageUnionReader::InIBD(_) => {
403                InIBDProcess::new(self, peer, &nc).execute().await
404            }
405        }
406    }
407
408    async fn process(
409        &self,
410        nc: Arc<dyn CKBProtocolContext + Sync>,
411        peer: PeerIndex,
412        message: packed::SyncMessageUnionReader<'_>,
413    ) {
414        let item_name = message.item_name();
415        let item_bytes = message.as_slice().len() as u64;
416        let status = self.try_process(Arc::clone(&nc), peer, message).await;
417
418        metric_ckb_message_bytes(
419            MetricDirection::In,
420            &SupportProtocols::Sync.name(),
421            item_name,
422            Some(status.code()),
423            item_bytes,
424        );
425
426        post_sync_process(nc.as_ref(), peer, item_name, status);
427    }
428
429    /// Get peers info
430    pub fn peers(&self) -> &Peers {
431        self.shared().state().peers()
432    }
433
434    fn better_tip_header(&self) -> HeaderIndexView {
435        let (header, total_difficulty) = {
436            let active_chain = self.shared.active_chain();
437            (
438                active_chain.tip_header(),
439                active_chain.total_difficulty().to_owned(),
440            )
441        };
442        let best_known = self.shared.state().shared_best_header();
443        // is_better_chain
444        if total_difficulty > *best_known.total_difficulty() {
445            (header, total_difficulty).into()
446        } else {
447            best_known
448        }
449    }
450
451    /// Process a new block sync from other peer
452    //TODO: process block which we don't request
453    pub fn asynchronous_process_remote_block(&self, remote_block: RemoteBlock) {
454        let block_hash = remote_block.block.hash();
455        let status = self.shared.active_chain().get_block_status(&block_hash);
456        // NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
457        // stopping synchronization even when orphan_pool maintains dirty items by bugs.
458        if status.contains(BlockStatus::BLOCK_STORED) {
459            error!("Block {} already stored", block_hash);
460        } else if status.contains(BlockStatus::HEADER_VALID) {
461            self.shared.accept_remote_block(&self.chain, remote_block);
462        } else {
463            debug!(
464                "Synchronizer process_new_block unexpected status {:?} {}",
465                status, block_hash,
466            );
467            // TODO which error should we return?
468        }
469    }
470
471    /// Process new block in blocking way
472    #[cfg(test)]
473    pub fn blocking_process_new_block(
474        &self,
475        block: core::BlockView,
476        _peer_id: PeerIndex,
477    ) -> Result<bool, ckb_error::Error> {
478        let block_hash = block.hash();
479        let status = self.shared.active_chain().get_block_status(&block_hash);
480        // NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
481        // stopping synchronization even when orphan_pool maintains dirty items by bugs.
482        if status.contains(BlockStatus::BLOCK_STORED) {
483            error!("block {} already stored", block_hash);
484            Ok(false)
485        } else if status.contains(BlockStatus::HEADER_VALID) {
486            self.chain.blocking_process_block(Arc::new(block))
487        } else {
488            debug!(
489                "Synchronizer process_new_block unexpected status {:?} {}",
490                status, block_hash,
491            );
492            // TODO while error should we return?
493            Ok(false)
494        }
495    }
496
497    /// Get blocks to fetch
498    pub fn get_blocks_to_fetch(
499        &self,
500        peer: PeerIndex,
501        ibd: IBDState,
502    ) -> Option<Vec<Vec<packed::Byte32>>> {
503        BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch(BlockNumber::MAX)
504    }
505
506    pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
507        let pid = SupportProtocols::Sync.protocol_id();
508        let (is_outbound, is_whitelist, is_2023edition) = nc
509            .get_peer(peer)
510            .map(|peer| {
511                (
512                    peer.is_outbound(),
513                    peer.is_whitelist,
514                    peer.protocols.get(&pid).map(|v| v == "3").unwrap_or(false),
515                )
516            })
517            .unwrap_or((false, false, false));
518
519        self.peers()
520            .sync_connected(peer, is_outbound, is_whitelist, is_2023edition);
521    }
522
523    /// Regularly check and eject some nodes that do not respond in time
524    //   - If at timeout their best known block now has more work than our tip
525    //     when the timeout was set, then either reset the timeout or clear it
526    //     (after comparing against our current tip's work)
527    //   - If at timeout their best known block still has less work than our
528    //     tip did when the timeout was set, then send a getheaders message,
529    //     and set a shorter timeout, HEADERS_RESPONSE_TIME seconds in future.
530    //     If their best known block is still behind when that new timeout is
531    //     reached, disconnect.
532    pub fn eviction(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
533        let active_chain = self.shared.active_chain();
534        let mut eviction = Vec::new();
535        let better_tip_header = self.better_tip_header();
536        for mut kv_pair in self.peers().state.iter_mut() {
537            let (peer, state) = kv_pair.pair_mut();
538            let now = unix_time_as_millis();
539
540            if let Some(ref mut controller) = state.headers_sync_controller {
541                let better_tip_ts = better_tip_header.timestamp();
542                if let Some(is_timeout) = controller.is_timeout(better_tip_ts, now) {
543                    if is_timeout {
544                        eviction.push(*peer);
545                        continue;
546                    }
547                } else {
548                    active_chain.send_getheaders_to_peer(
549                        nc,
550                        *peer,
551                        better_tip_header.number_and_hash(),
552                    );
553                }
554            }
555
556            // On ibd, node should only have one peer to sync headers, and it's state can control by
557            // headers_sync_controller.
558            //
559            // The header sync of other nodes does not matter in the ibd phase, and parallel synchronization
560            // can be enabled by unknown list, so there is no need to repeatedly download headers with
561            // multiple nodes at the same time.
562            if active_chain.is_initial_block_download() {
563                continue;
564            }
565            if state.peer_flags.is_outbound {
566                let best_known_header = state.best_known_header.as_ref();
567                let (tip_header, local_total_difficulty) = {
568                    (
569                        active_chain.tip_header().to_owned(),
570                        active_chain.total_difficulty().to_owned(),
571                    )
572                };
573                if best_known_header
574                    .map(|header_index| header_index.total_difficulty().clone())
575                    .unwrap_or_default()
576                    >= local_total_difficulty
577                {
578                    if state.chain_sync.timeout != 0 {
579                        state.chain_sync.timeout = 0;
580                        state.chain_sync.work_header = None;
581                        state.chain_sync.total_difficulty = None;
582                        state.chain_sync.sent_getheaders = false;
583                    }
584                } else if state.chain_sync.timeout == 0
585                    || (best_known_header.is_some()
586                        && best_known_header
587                            .map(|header_index| header_index.total_difficulty().clone())
588                            >= state.chain_sync.total_difficulty)
589                {
590                    // Our best block known by this peer is behind our tip, and we're either noticing
591                    // that for the first time, OR this peer was able to catch up to some earlier point
592                    // where we checked against our tip.
593                    // Either way, set a new timeout based on current tip.
594                    state.chain_sync.timeout = now + CHAIN_SYNC_TIMEOUT;
595                    state.chain_sync.work_header = Some(tip_header);
596                    state.chain_sync.total_difficulty = Some(local_total_difficulty);
597                    state.chain_sync.sent_getheaders = false;
598                } else if state.chain_sync.timeout > 0 && now > state.chain_sync.timeout {
599                    // No evidence yet that our peer has synced to a chain with work equal to that
600                    // of our tip, when we first detected it was behind. Send a single getheaders
601                    // message to give the peer a chance to update us.
602                    if state.chain_sync.sent_getheaders {
603                        if state.peer_flags.is_protect || state.peer_flags.is_whitelist {
604                            if state.sync_started() {
605                                self.shared().state().suspend_sync(state);
606                            }
607                        } else {
608                            eviction.push(*peer);
609                        }
610                    } else {
611                        state.chain_sync.sent_getheaders = true;
612                        state.chain_sync.timeout = now + EVICTION_HEADERS_RESPONSE_TIME;
613                        active_chain.send_getheaders_to_peer(
614                            nc,
615                            *peer,
616                            state
617                                .chain_sync
618                                .work_header
619                                .as_ref()
620                                .expect("work_header be assigned")
621                                .into(),
622                        );
623                    }
624                }
625            }
626        }
627        for peer in eviction {
628            info!("Timeout eviction peer={}", peer);
629            if let Err(err) = nc.disconnect(peer, "sync timeout eviction") {
630                debug!("synchronizer disconnect error: {:?}", err);
631            }
632        }
633    }
634
635    fn start_sync_headers(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
636        let now = unix_time_as_millis();
637        let active_chain = self.shared.active_chain();
638        let ibd = active_chain.is_initial_block_download();
639        let peers: Vec<PeerIndex> = self
640            .peers()
641            .state
642            .iter()
643            .filter(|kv_pair| kv_pair.value().can_start_sync(now, ibd))
644            .map(|kv_pair| *kv_pair.key())
645            .collect();
646
647        if peers.is_empty() {
648            return;
649        }
650
651        let tip = self.better_tip_header();
652
653        for peer in peers {
654            // Only sync with 1 peer if we're in IBD
655            if self
656                .shared()
657                .state()
658                .n_sync_started()
659                .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
660                    if ibd && x != 0 { None } else { Some(x + 1) }
661                })
662                .is_err()
663            {
664                break;
665            }
666            {
667                if let Some(mut peer_state) = self.peers().state.get_mut(&peer) {
668                    peer_state.start_sync(HeadersSyncController::from_header(&tip));
669                }
670            }
671
672            debug!("Start sync peer={}", peer);
673            active_chain.send_getheaders_to_peer(nc, peer, tip.number_and_hash());
674        }
675    }
676
677    fn get_peers_to_fetch(
678        &self,
679        ibd: IBDState,
680        disconnect_list: &HashSet<PeerIndex>,
681    ) -> Vec<PeerIndex> {
682        trace!("Poll find_blocks_to_fetch selecting peers");
683        let state = &self
684            .shared
685            .state()
686            .read_inflight_blocks()
687            .download_schedulers;
688        let mut peers: Vec<PeerIndex> = self
689            .peers()
690            .state
691            .iter()
692            .filter(|kv_pair| {
693                let (id, state) = kv_pair.pair();
694                if disconnect_list.contains(id) {
695                    return false;
696                };
697                match ibd {
698                    IBDState::In => {
699                        state.peer_flags.is_outbound
700                            || state.peer_flags.is_whitelist
701                            || state.peer_flags.is_protect
702                    }
703                    IBDState::Out => state.started_or_tip_synced(),
704                }
705            })
706            .map(|kv_pair| *kv_pair.key())
707            .collect();
708        peers.sort_by_key(|id| {
709            ::std::cmp::Reverse(
710                state
711                    .get(id)
712                    .map_or(INIT_BLOCKS_IN_TRANSIT_PER_PEER, |d| d.task_count()),
713            )
714        });
715        peers
716    }
717
718    fn find_blocks_to_fetch(&mut self, nc: &Arc<dyn CKBProtocolContext + Sync>, ibd: IBDState) {
719        if self.chain.is_verifying_unverified_blocks_on_startup() {
720            trace!(
721                "skip find_blocks_to_fetch, ckb_chain is verifying unverified blocks on startup"
722            );
723            return;
724        }
725
726        if ckb_stop_handler::has_received_stop_signal() {
727            info!("received stop signal, stop find_blocks_to_fetch");
728            return;
729        }
730
731        let unverified_tip = self.shared.active_chain().unverified_tip_number();
732
733        let disconnect_list = {
734            let mut list = self
735                .shared()
736                .state()
737                .write_inflight_blocks()
738                .prune(unverified_tip);
739            if let IBDState::In = ibd {
740                // best known < tip and in IBD state, and unknown list is empty,
741                // these node can be disconnect
742                list.extend(
743                    self.shared
744                        .state()
745                        .peers()
746                        .get_best_known_less_than_tip_and_unknown_empty(unverified_tip),
747                )
748            };
749            list
750        };
751
752        for peer in disconnect_list.iter() {
753            // It is not forbidden to evict protected nodes:
754            // - First of all, this node is not designated by the user for protection,
755            //   but is connected randomly. It does not represent the will of the user
756            // - Secondly, in the synchronization phase, the nodes with zero download tasks are
757            //   retained, apart from reducing the download efficiency, there is no benefit.
758            if self
759                .peers()
760                .get_flag(*peer)
761                .map(|flag| flag.is_whitelist)
762                .unwrap_or(false)
763            {
764                continue;
765            }
766            let nc = Arc::clone(nc);
767            let peer = *peer;
768            self.shared.shared().async_handle().spawn(async move {
769                let _status = nc.async_disconnect(peer, "sync disconnect").await;
770            });
771        }
772
773        // fetch use a lot of cpu time, especially in ibd state
774        // so, the fetch function use another thread
775        match nc.p2p_control() {
776            Some(raw) => match self.fetch_channel {
777                Some(ref sender) => {
778                    if !sender.is_full() {
779                        let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
780                        let _ignore = sender.try_send(FetchCMD {
781                            peers,
782                            ibd_state: ibd,
783                        });
784                    }
785                }
786                None => {
787                    let p2p_control = raw.clone();
788                    let (sender, recv) = channel::bounded(2);
789                    let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
790                    sender
791                        .send(FetchCMD {
792                            peers,
793                            ibd_state: ibd,
794                        })
795                        .unwrap();
796                    self.fetch_channel = Some(sender);
797                    let thread = ::std::thread::Builder::new();
798                    let number = self.shared.state().shared_best_header_ref().number();
799                    const THREAD_NAME: &str = "BlockDownload";
800                    let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
801                    let blockdownload_jh = thread
802                        .name(THREAD_NAME.into())
803                        .spawn(move || {
804                            let stop_signal = new_crossbeam_exit_rx();
805                            BlockFetchCMD {
806                                sync_shared,
807                                p2p_control,
808                                recv,
809                                number,
810                                can_start: CanStart::MinWorkNotReach,
811                                start_timestamp: unix_time_as_millis(),
812                            }
813                            .run(stop_signal);
814                        })
815                        .expect("download thread can't start");
816                    register_thread(THREAD_NAME, blockdownload_jh);
817                }
818            },
819            None => {
820                for peer in self.get_peers_to_fetch(ibd, &disconnect_list) {
821                    if let Some(fetch) =
822                        tokio::task::block_in_place(|| self.get_blocks_to_fetch(peer, ibd))
823                    {
824                        for item in fetch {
825                            self.send_getblocks(item, nc, peer);
826                        }
827                    }
828                }
829            }
830        }
831    }
832
833    fn send_getblocks(
834        &self,
835        v_fetch: Vec<packed::Byte32>,
836        nc: &Arc<dyn CKBProtocolContext + Sync>,
837        peer: PeerIndex,
838    ) {
839        let content = packed::GetBlocks::new_builder()
840            .block_hashes(v_fetch.clone())
841            .build();
842        let message = packed::SyncMessage::new_builder().set(content).build();
843
844        debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
845        let nc = Arc::clone(nc);
846        self.shared.shared().async_handle().spawn(async move {
847            let _status = async_send_message_to(&nc, peer, &message).await;
848        });
849    }
850}
851
852#[async_trait]
853impl CKBProtocolHandler for Synchronizer {
854    async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
855        // NOTE: 100ms is what bitcoin use.
856        nc.set_notify(SYNC_NOTIFY_INTERVAL, SEND_GET_HEADERS_TOKEN)
857            .await
858            .expect("set_notify at init is ok");
859        nc.set_notify(SYNC_NOTIFY_INTERVAL, TIMEOUT_EVICTION_TOKEN)
860            .await
861            .expect("set_notify at init is ok");
862        nc.set_notify(IBD_BLOCK_FETCH_INTERVAL, IBD_BLOCK_FETCH_TOKEN)
863            .await
864            .expect("set_notify at init is ok");
865        nc.set_notify(NOT_IBD_BLOCK_FETCH_INTERVAL, NOT_IBD_BLOCK_FETCH_TOKEN)
866            .await
867            .expect("set_notify at init is ok");
868        nc.set_notify(Duration::from_secs(2), NO_PEER_CHECK_TOKEN)
869            .await
870            .expect("set_notify at init is ok");
871    }
872
873    async fn received(
874        &mut self,
875        nc: Arc<dyn CKBProtocolContext + Sync>,
876        peer_index: PeerIndex,
877        data: Bytes,
878    ) {
879        let msg = match packed::SyncMessageReader::from_compatible_slice(&data) {
880            Ok(msg) => {
881                let item = msg.to_enum();
882                if let packed::SyncMessageUnionReader::SendBlock(ref reader) = item {
883                    if reader.has_extra_fields() || reader.block().count_extra_fields() > 1 {
884                        info!(
885                            "A malformed message from peer {}: \
886                             excessive fields detected in SendBlock",
887                            peer_index
888                        );
889                        nc.ban_peer(
890                            peer_index,
891                            BAD_MESSAGE_BAN_TIME,
892                            String::from(
893                                "send us a malformed message: \
894                                 too many fields in SendBlock",
895                            ),
896                        );
897                        return;
898                    } else {
899                        item
900                    }
901                } else {
902                    match packed::SyncMessageReader::from_slice(&data) {
903                        Ok(msg) => msg.to_enum(),
904                        _ => {
905                            info!(
906                                "A malformed message from peer {}: \
907                                 excessive fields",
908                                peer_index
909                            );
910                            nc.ban_peer(
911                                peer_index,
912                                BAD_MESSAGE_BAN_TIME,
913                                String::from(
914                                    "send us a malformed message: \
915                                     too many fields",
916                                ),
917                            );
918                            return;
919                        }
920                    }
921                }
922            }
923            _ => {
924                info!("A malformed message from peer {}", peer_index);
925                nc.ban_peer(
926                    peer_index,
927                    BAD_MESSAGE_BAN_TIME,
928                    String::from("send us a malformed message"),
929                );
930                return;
931            }
932        };
933
934        debug!("Received msg {} from {}", msg.item_name(), peer_index);
935        #[cfg(feature = "with_sentry")]
936        {
937            let sentry_hub = sentry::Hub::current();
938            let _scope_guard = sentry_hub.push_scope();
939            sentry_hub.configure_scope(|scope| {
940                scope.set_tag("p2p.protocol", "synchronizer");
941                scope.set_tag("p2p.message", msg.item_name());
942            });
943        }
944
945        let start_time = Instant::now();
946        self.process(nc, peer_index, msg).await;
947        debug!(
948            "Process message={}, peer={}, cost={:?}",
949            msg.item_name(),
950            peer_index,
951            Instant::now().saturating_duration_since(start_time),
952        );
953    }
954
955    async fn connected(
956        &mut self,
957        nc: Arc<dyn CKBProtocolContext + Sync>,
958        peer_index: PeerIndex,
959        _version: &str,
960    ) {
961        info!("SyncProtocol.connected peer={}", peer_index);
962        self.on_connected(nc.as_ref(), peer_index);
963    }
964
965    async fn disconnected(
966        &mut self,
967        _nc: Arc<dyn CKBProtocolContext + Sync>,
968        peer_index: PeerIndex,
969    ) {
970        let sync_state = self.shared().state();
971        sync_state.disconnected(peer_index);
972        info!("SyncProtocol.disconnected peer={}", peer_index);
973    }
974
975    async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
976        if !self.peers().state.is_empty() {
977            let start_time = Instant::now();
978            trace!("Start notify token={}", token);
979            match token {
980                SEND_GET_HEADERS_TOKEN => {
981                    self.start_sync_headers(&nc);
982                }
983                IBD_BLOCK_FETCH_TOKEN => {
984                    if self.shared.active_chain().is_initial_block_download() {
985                        self.find_blocks_to_fetch(&nc, IBDState::In);
986                    } else {
987                        {
988                            self.shared.state().write_inflight_blocks().adjustment = false;
989                        }
990                        self.shared.state().peers().clear_unknown_list();
991                        if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).await.is_err() {
992                            trace!("Ibd block fetch token removal failed");
993                        }
994                    }
995                }
996                NOT_IBD_BLOCK_FETCH_TOKEN => {
997                    if !self.shared.active_chain().is_initial_block_download() {
998                        self.find_blocks_to_fetch(&nc, IBDState::Out);
999                    }
1000                }
1001                TIMEOUT_EVICTION_TOKEN => {
1002                    self.eviction(&nc);
1003                }
1004                // Here is just for NO_PEER_CHECK_TOKEN token, only handle it when there is no peer.
1005                _ => {}
1006            }
1007
1008            trace!(
1009                "Finished notify token={} cost={:?}",
1010                token,
1011                Instant::now().saturating_duration_since(start_time)
1012            );
1013        } else if token == NO_PEER_CHECK_TOKEN {
1014            debug!("No peers connected");
1015        }
1016    }
1017}