Skip to main content

ckb_sync/synchronizer/
mod.rs

1//! CKB node has initial block download phase (IBD mode) like Bitcoin:
2//! <https://btcinformation.org/en/glossary/initial-block-download>
3//!
4//! When CKB node is in IBD mode, it will respond `packed::InIBD` to `GetHeaders` and `GetBlocks` requests
5//!
6//! And CKB has a headers-first synchronization style like Bitcoin:
7//! <https://btcinformation.org/en/glossary/headers-first-sync>
8//!
9mod block_fetcher;
10mod block_process;
11mod get_blocks_process;
12mod get_headers_process;
13mod headers_process;
14mod in_ibd_process;
15
16pub(crate) use self::block_fetcher::BlockFetcher;
17pub(crate) use self::block_process::BlockProcess;
18pub(crate) use self::get_blocks_process::GetBlocksProcess;
19pub(crate) use self::get_headers_process::GetHeadersProcess;
20pub(crate) use self::headers_process::HeadersProcess;
21pub(crate) use self::in_ibd_process::InIBDProcess;
22
23use crate::types::{HeadersSyncController, IBDState, Peers, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, async_send_message_to, metric_ckb_message_bytes};
25use crate::{Status, StatusCode};
26use ckb_shared::block_status::BlockStatus;
27
28use ckb_chain::{ChainController, RemoteBlock};
29use ckb_channel as channel;
30use ckb_channel::{Receiver, select};
31use ckb_constant::sync::{
32    BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME,
33    INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE,
34};
35use ckb_logger::{debug, error, info, trace, warn};
36use ckb_metrics::HistogramTimer;
37use ckb_network::{
38    CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceAsyncControl, ServiceControl,
39    SupportProtocols, async_trait, bytes::Bytes, tokio,
40};
41use ckb_shared::types::HeaderIndexView;
42use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
43use ckb_systemtime::unix_time_as_millis;
44
45#[cfg(test)]
46use ckb_types::core;
47use ckb_types::{
48    core::BlockNumber,
49    packed::{self, Byte32},
50    prelude::*,
51};
52use std::{
53    collections::HashSet,
54    sync::{Arc, atomic::Ordering},
55    time::{Duration, Instant},
56};
57
58pub const SEND_GET_HEADERS_TOKEN: u64 = 0;
59pub const IBD_BLOCK_FETCH_TOKEN: u64 = 1;
60pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
61pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
62pub const NO_PEER_CHECK_TOKEN: u64 = 255;
63
64const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1);
65const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
66const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);
67
68#[derive(Copy, Clone)]
69enum CanStart {
70    FetchToTarget(BlockNumber),
71    Ready,
72    MinWorkNotReach,
73    AssumeValidNotFound,
74}
75
76struct FetchCMD {
77    peers: Vec<PeerIndex>,
78    ibd_state: IBDState,
79}
80
81struct BlockFetchCMD {
82    sync_shared: Arc<SyncShared>,
83    p2p_control: ServiceControl,
84    recv: channel::Receiver<FetchCMD>,
85    can_start: CanStart,
86    number: BlockNumber,
87    start_timestamp: u64,
88}
89
90impl BlockFetchCMD {
91    fn process_fetch_cmd(&mut self, cmd: FetchCMD) {
92        let FetchCMD { peers, ibd_state }: FetchCMD = cmd;
93
94        let fetch_blocks_fn = |cmd: &mut BlockFetchCMD, assume_target: BlockNumber| {
95            for peer in peers {
96                if ckb_stop_handler::has_received_stop_signal() {
97                    return;
98                }
99
100                let mut fetch_end: BlockNumber = u64::MAX;
101                if assume_target != 0 {
102                    fetch_end = assume_target
103                }
104
105                if let Some(fetch) =
106                    BlockFetcher::new(Arc::clone(&cmd.sync_shared), peer, ibd_state)
107                        .fetch(fetch_end)
108                {
109                    for item in fetch {
110                        if ckb_stop_handler::has_received_stop_signal() {
111                            return;
112                        }
113                        let ctrl = cmd.p2p_control.clone();
114                        let handle = cmd.sync_shared.shared().async_handle();
115                        handle.spawn(BlockFetchCMD::send_getblocks(item, ctrl, peer));
116                    }
117                }
118            }
119        };
120
121        match self.can_start() {
122            CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target),
123            CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX),
124            CanStart::MinWorkNotReach => {
125                let best_known = self.sync_shared.state().shared_best_header_ref();
126                let number = best_known.number();
127                if number != self.number && (number - self.number).is_multiple_of(10000) {
128                    self.number = number;
129                    info!(
130                        "The current best known header number: {}, total difficulty: {:#x}. \
131                                 Block download minimum requirements: header number: 500_000, total difficulty: {:#x}.",
132                        number,
133                        best_known.total_difficulty(),
134                        self.sync_shared.state().min_chain_work()
135                    );
136                }
137            }
138            CanStart::AssumeValidNotFound => {
139                let state = self.sync_shared.state();
140                let shared = self.sync_shared.shared();
141                let best_known = state.shared_best_header_ref();
142                let number = best_known.number();
143                let assume_valid_target: Byte32 = shared
144                    .assume_valid_targets()
145                    .as_ref()
146                    .and_then(|targets| targets.first())
147                    .map(Pack::pack)
148                    .expect("assume valid target must exist");
149
150                if number != self.number && (number - self.number).is_multiple_of(10000) {
151                    self.number = number;
152                    let remaining_headers_sync_log = self.reaming_headers_sync_log();
153
154                    info!(
155                        "best known header {}-{}, \
156                                 CKB is syncing to latest Header to find the assume valid target: {}. \
157                                 Please wait. {}",
158                        number,
159                        best_known.hash(),
160                        assume_valid_target,
161                        remaining_headers_sync_log
162                    );
163                }
164            }
165        }
166    }
167
168    fn reaming_headers_sync_log(&self) -> String {
169        match self.calc_time_need_to_reach_latest_tip_header() {
170            Some(remaining) => {
171                let secs = remaining.as_secs();
172                match secs {
173                    0 => "Almost synced.".to_string(),
174                    1..=59 => format!("Need {} seconds to sync to the latest Header.", secs),
175                    60..=3599 => {
176                        format!("Need {} minutes to sync to the latest Header.", secs / 60)
177                    }
178                    _ => {
179                        let hours = secs / 3600;
180                        let minutes = (secs % 3600) / 60;
181                        format!(
182                            "Need {} hours {} minutes to sync to the latest Header.",
183                            hours, minutes
184                        )
185                    }
186                }
187            }
188            None => "".to_string(),
189        }
190    }
191
192    // Timeline:
193    //
194    // |-------------------|--------------------------------|------------|---->
195    // Genesis  (shared best timestamp)                     |           now
196    // |                   |                                |            |
197    // |             (Sync point)                  (CKB process start)   |
198    // |                   |                                             |
199    // |--Synced Part------|------------ Remain to Sync -----------------|
200    // |                                                                 |
201    // |------------------- CKB Chain Age -------------------------------|
202    //
203    fn calc_time_need_to_reach_latest_tip_header(&self) -> Option<Duration> {
204        let genesis_timestamp = self
205            .sync_shared
206            .consensus()
207            .genesis_block()
208            .header()
209            .timestamp();
210        let shared_best_timestamp = self.sync_shared.state().shared_best_header().timestamp();
211
212        let ckb_process_start_timestamp = self.start_timestamp;
213
214        let now_timestamp = unix_time_as_millis();
215
216        // Use floating point to avoid integer division precision loss
217        let ckb_chain_age = now_timestamp.checked_sub(genesis_timestamp)? as f64;
218        let ckb_process_age = now_timestamp.checked_sub(ckb_process_start_timestamp)? as f64;
219        let has_synced_headers_age = shared_best_timestamp.checked_sub(genesis_timestamp)? as f64;
220
221        if ckb_process_age <= 0.0 || has_synced_headers_age <= 0.0 {
222            return None;
223        }
224
225        let ckb_sync_header_speed = has_synced_headers_age / ckb_process_age;
226        let sync_all_headers_timecost = ckb_chain_age / ckb_sync_header_speed;
227        let sync_remaining_headers_needed = sync_all_headers_timecost - ckb_process_age;
228
229        if sync_remaining_headers_needed <= 0.0 {
230            Some(Duration::from_millis(0))
231        } else {
232            Some(Duration::from_millis(sync_remaining_headers_needed as u64))
233        }
234    }
235
236    fn run(&mut self, stop_signal: Receiver<()>) {
237        loop {
238            select! {
239                recv(self.recv) -> msg => {
240                    if let Ok(cmd) = msg {
241                        self.process_fetch_cmd(cmd)
242                    }
243                }
244                recv(stop_signal) -> _ => {
245                    info!("BlockDownload received exit signal, exit now");
246                    return;
247                }
248            }
249        }
250    }
251
252    fn can_start(&mut self) -> CanStart {
253        if let CanStart::Ready = self.can_start {
254            return self.can_start;
255        }
256
257        let shared = self.sync_shared.shared();
258        let state = self.sync_shared.state();
259
260        let min_work_reach = |flag: &mut CanStart| {
261            if state.min_chain_work_ready() {
262                *flag = CanStart::AssumeValidNotFound;
263            }
264        };
265
266        let assume_valid_target_find = |flag: &mut CanStart| {
267            let mut assume_valid_targets = shared.assume_valid_targets();
268            if let Some(ref targets) = *assume_valid_targets {
269                if targets.is_empty() {
270                    assume_valid_targets.take();
271                    *flag = CanStart::Ready;
272                    return;
273                }
274                let first_target = targets
275                    .first()
276                    .expect("has checked targets is not empty, assume valid target must exist");
277                match shared.header_map().get(&first_target.into()) {
278                    Some(header) => {
279                        if matches!(*flag, CanStart::FetchToTarget(fetch_target) if fetch_target == header.number())
280                        {
281                            // BlockFetchCMD has set the fetch target, no need to set it again
282                        } else {
283                            *flag = CanStart::FetchToTarget(header.number());
284                            info!(
285                                "assume valid target found in header_map; CKB will start fetch blocks to {:?} now",
286                                header.number_and_hash()
287                            );
288                        }
289                        // Blocks that are no longer in the scope of ibd must be forced to verify
290                        if unix_time_as_millis().saturating_sub(header.timestamp()) < MAX_TIP_AGE {
291                            assume_valid_targets.take();
292                            warn!(
293                                "the duration gap between 'assume valid target' and 'now' is less than 24h; CKB will ignore the specified assume valid target and do full verification from now on"
294                            );
295                        }
296                    }
297                    None => {
298                        // Best known already not in the scope of ibd, it means target is invalid
299                        if unix_time_as_millis()
300                            .saturating_sub(state.shared_best_header_ref().timestamp())
301                            < MAX_TIP_AGE
302                        {
303                            warn!(
304                                "the duration gap between 'shared_best_header' and 'now' is less than 24h, but CKB haven't found the assume valid target in header_map; CKB will ignore the specified assume valid target and do full verification from now on"
305                            );
306                            *flag = CanStart::Ready;
307                            assume_valid_targets.take();
308                        }
309                    }
310                }
311            } else {
312                *flag = CanStart::Ready;
313            }
314        };
315
316        match self.can_start {
317            CanStart::FetchToTarget(_) => {
318                assume_valid_target_find(&mut self.can_start);
319                self.can_start
320            }
321            CanStart::Ready => self.can_start,
322            CanStart::MinWorkNotReach => {
323                min_work_reach(&mut self.can_start);
324                if let CanStart::AssumeValidNotFound = self.can_start {
325                    assume_valid_target_find(&mut self.can_start);
326                }
327                self.can_start
328            }
329            CanStart::AssumeValidNotFound => {
330                assume_valid_target_find(&mut self.can_start);
331                self.can_start
332            }
333        }
334    }
335
336    async fn send_getblocks(v_fetch: Vec<packed::Byte32>, nc: ServiceControl, peer: PeerIndex) {
337        let content = packed::GetBlocks::new_builder()
338            .block_hashes(v_fetch.clone())
339            .build();
340        let message = packed::SyncMessage::new_builder().set(content).build();
341
342        debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
343        if let Err(err) = Into::<ServiceAsyncControl>::into(nc)
344            .send_message_to(
345                peer,
346                SupportProtocols::Sync.protocol_id(),
347                message.as_bytes(),
348            )
349            .await
350        {
351            debug!("synchronizer sending GetBlocks error: {:?}", err);
352        }
353    }
354}
355
356/// Sync protocol handle
357pub struct Synchronizer {
358    pub(crate) chain: ChainController,
359    /// Sync shared state
360    pub shared: Arc<SyncShared>,
361    fetch_channel: Option<channel::Sender<FetchCMD>>,
362}
363
364impl Synchronizer {
365    /// Init sync protocol handle
366    ///
367    /// This is a runtime sync protocol shared state, and any Sync protocol messages will be processed and forwarded by it
368    pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
369        Synchronizer {
370            chain,
371            shared,
372            fetch_channel: None,
373        }
374    }
375
376    /// Get shared state
377    pub fn shared(&self) -> &Arc<SyncShared> {
378        &self.shared
379    }
380
381    async fn try_process(
382        &self,
383        nc: Arc<dyn CKBProtocolContext + Sync>,
384        peer: PeerIndex,
385        message: packed::SyncMessageUnionReader<'_>,
386    ) -> Status {
387        let _trace_timecost: Option<HistogramTimer> = {
388            ckb_metrics::handle().map(|handle| {
389                handle
390                    .ckb_sync_msg_process_duration
391                    .with_label_values(&[message.item_name()])
392                    .start_timer()
393            })
394        };
395
396        match message {
397            packed::SyncMessageUnionReader::GetHeaders(reader) => {
398                tokio::task::block_in_place(|| {
399                    GetHeadersProcess::new(reader, self, peer, &nc).execute()
400                })
401            }
402            packed::SyncMessageUnionReader::SendHeaders(reader) => {
403                tokio::task::block_in_place(|| {
404                    HeadersProcess::new(reader, self, peer, &nc).execute()
405                })
406            }
407            packed::SyncMessageUnionReader::GetBlocks(reader) => {
408                tokio::task::block_in_place(|| {
409                    GetBlocksProcess::new(reader, self, peer, &nc).execute()
410                })
411            }
412            packed::SyncMessageUnionReader::SendBlock(reader) => {
413                if reader.check_data() {
414                    BlockProcess::new(reader, self, peer, nc).execute()
415                } else {
416                    StatusCode::ProtocolMessageIsMalformed.with_context("SendBlock is invalid")
417                }
418            }
419            packed::SyncMessageUnionReader::InIBD(_) => {
420                InIBDProcess::new(self, peer, &nc).execute().await
421            }
422        }
423    }
424
425    async fn process(
426        &self,
427        nc: Arc<dyn CKBProtocolContext + Sync>,
428        peer: PeerIndex,
429        message: packed::SyncMessageUnionReader<'_>,
430    ) {
431        let item_name = message.item_name();
432        let item_bytes = message.as_slice().len() as u64;
433        let status = self.try_process(Arc::clone(&nc), peer, message).await;
434
435        metric_ckb_message_bytes(
436            MetricDirection::In,
437            &SupportProtocols::Sync.name(),
438            item_name,
439            Some(status.code()),
440            item_bytes,
441        );
442
443        post_sync_process(nc.as_ref(), peer, item_name, status);
444    }
445
446    /// Get peers info
447    pub fn peers(&self) -> &Peers {
448        self.shared().state().peers()
449    }
450
451    fn better_tip_header(&self) -> HeaderIndexView {
452        let (header, total_difficulty) = {
453            let active_chain = self.shared.active_chain();
454            (
455                active_chain.tip_header(),
456                active_chain.total_difficulty().to_owned(),
457            )
458        };
459        let best_known = self.shared.state().shared_best_header();
460        // is_better_chain
461        if total_difficulty > *best_known.total_difficulty() {
462            (header, total_difficulty).into()
463        } else {
464            best_known
465        }
466    }
467
468    /// Process a new block sync from other peer
469    //TODO: process block which we don't request
470    pub fn asynchronous_process_remote_block(&self, remote_block: RemoteBlock) {
471        let block_hash = remote_block.block.hash();
472        let status = self.shared.active_chain().get_block_status(&block_hash);
473        // NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
474        // stopping synchronization even when orphan_pool maintains dirty items by bugs.
475        if status.contains(BlockStatus::BLOCK_STORED) {
476            error!("Block {} already stored", block_hash);
477        } else if status.contains(BlockStatus::HEADER_VALID) {
478            self.shared.accept_remote_block(&self.chain, remote_block);
479        } else {
480            debug!(
481                "Synchronizer process_new_block unexpected status {:?} {}",
482                status, block_hash,
483            );
484            // TODO which error should we return?
485        }
486    }
487
488    /// Process new block in blocking way
489    #[cfg(test)]
490    pub fn blocking_process_new_block(
491        &self,
492        block: core::BlockView,
493        _peer_id: PeerIndex,
494    ) -> Result<bool, ckb_error::Error> {
495        let block_hash = block.hash();
496        let status = self.shared.active_chain().get_block_status(&block_hash);
497        // NOTE: Filtering `BLOCK_STORED` but not `BLOCK_RECEIVED`, is for avoiding
498        // stopping synchronization even when orphan_pool maintains dirty items by bugs.
499        if status.contains(BlockStatus::BLOCK_STORED) {
500            error!("block {} already stored", block_hash);
501            Ok(false)
502        } else if status.contains(BlockStatus::HEADER_VALID) {
503            self.chain.blocking_process_block(Arc::new(block))
504        } else {
505            debug!(
506                "Synchronizer process_new_block unexpected status {:?} {}",
507                status, block_hash,
508            );
509            // TODO while error should we return?
510            Ok(false)
511        }
512    }
513
514    /// Get blocks to fetch
515    pub fn get_blocks_to_fetch(
516        &self,
517        peer: PeerIndex,
518        ibd: IBDState,
519    ) -> Option<Vec<Vec<packed::Byte32>>> {
520        BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch(BlockNumber::MAX)
521    }
522
523    pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
524        let pid = SupportProtocols::Sync.protocol_id();
525        let (is_outbound, is_whitelist, is_2023edition) = nc
526            .get_peer(peer)
527            .map(|peer| {
528                (
529                    peer.is_outbound(),
530                    peer.is_whitelist,
531                    peer.protocols.get(&pid).map(|v| v == "3").unwrap_or(false),
532                )
533            })
534            .unwrap_or((false, false, false));
535
536        self.peers()
537            .sync_connected(peer, is_outbound, is_whitelist, is_2023edition);
538    }
539
540    /// Regularly check and eject some nodes that do not respond in time
541    //   - If at timeout their best known block now has more work than our tip
542    //     when the timeout was set, then either reset the timeout or clear it
543    //     (after comparing against our current tip's work)
544    //   - If at timeout their best known block still has less work than our
545    //     tip did when the timeout was set, then send a getheaders message,
546    //     and set a shorter timeout, HEADERS_RESPONSE_TIME seconds in future.
547    //     If their best known block is still behind when that new timeout is
548    //     reached, disconnect.
549    pub fn eviction(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
550        let active_chain = self.shared.active_chain();
551        let mut eviction = Vec::new();
552        let better_tip_header = self.better_tip_header();
553        for mut kv_pair in self.peers().state.iter_mut() {
554            let (peer, state) = kv_pair.pair_mut();
555            let now = unix_time_as_millis();
556
557            if let Some(ref mut controller) = state.headers_sync_controller {
558                let better_tip_ts = better_tip_header.timestamp();
559                if let Some(is_timeout) = controller.is_timeout(better_tip_ts, now) {
560                    if is_timeout {
561                        eviction.push(*peer);
562                        continue;
563                    }
564                } else {
565                    active_chain.send_getheaders_to_peer(
566                        nc,
567                        *peer,
568                        better_tip_header.number_and_hash(),
569                    );
570                }
571            }
572
573            // On ibd, node should only have one peer to sync headers, and it's state can control by
574            // headers_sync_controller.
575            //
576            // The header sync of other nodes does not matter in the ibd phase, and parallel synchronization
577            // can be enabled by unknown list, so there is no need to repeatedly download headers with
578            // multiple nodes at the same time.
579            if active_chain.is_initial_block_download() {
580                continue;
581            }
582            if state.peer_flags.is_outbound {
583                let best_known_header = state.best_known_header.as_ref();
584                let (tip_header, local_total_difficulty) = {
585                    (
586                        active_chain.tip_header().to_owned(),
587                        active_chain.total_difficulty().to_owned(),
588                    )
589                };
590                if best_known_header
591                    .map(|header_index| header_index.total_difficulty().clone())
592                    .unwrap_or_default()
593                    >= local_total_difficulty
594                {
595                    if state.chain_sync.timeout != 0 {
596                        state.chain_sync.timeout = 0;
597                        state.chain_sync.work_header = None;
598                        state.chain_sync.total_difficulty = None;
599                        state.chain_sync.sent_getheaders = false;
600                    }
601                } else if state.chain_sync.timeout == 0
602                    || (best_known_header.is_some()
603                        && best_known_header
604                            .map(|header_index| header_index.total_difficulty().clone())
605                            >= state.chain_sync.total_difficulty)
606                {
607                    // Our best block known by this peer is behind our tip, and we're either noticing
608                    // that for the first time, OR this peer was able to catch up to some earlier point
609                    // where we checked against our tip.
610                    // Either way, set a new timeout based on current tip.
611                    state.chain_sync.timeout = now + CHAIN_SYNC_TIMEOUT;
612                    state.chain_sync.work_header = Some(tip_header);
613                    state.chain_sync.total_difficulty = Some(local_total_difficulty);
614                    state.chain_sync.sent_getheaders = false;
615                } else if state.chain_sync.timeout > 0 && now > state.chain_sync.timeout {
616                    // No evidence yet that our peer has synced to a chain with work equal to that
617                    // of our tip, when we first detected it was behind. Send a single getheaders
618                    // message to give the peer a chance to update us.
619                    if state.chain_sync.sent_getheaders {
620                        if state.peer_flags.is_protect || state.peer_flags.is_whitelist {
621                            if state.sync_started() {
622                                self.shared().state().suspend_sync(state);
623                            }
624                        } else {
625                            eviction.push(*peer);
626                        }
627                    } else {
628                        state.chain_sync.sent_getheaders = true;
629                        state.chain_sync.timeout = now + EVICTION_HEADERS_RESPONSE_TIME;
630                        active_chain.send_getheaders_to_peer(
631                            nc,
632                            *peer,
633                            state
634                                .chain_sync
635                                .work_header
636                                .as_ref()
637                                .expect("work_header be assigned")
638                                .into(),
639                        );
640                    }
641                }
642            }
643        }
644        for peer in eviction {
645            info!("Timeout eviction peer={}", peer);
646            if let Err(err) = nc.disconnect(peer, "sync timeout eviction") {
647                debug!("synchronizer disconnect error: {:?}", err);
648            }
649        }
650    }
651
652    fn start_sync_headers(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
653        let now = unix_time_as_millis();
654        let active_chain = self.shared.active_chain();
655        let ibd = active_chain.is_initial_block_download();
656        let peers: Vec<PeerIndex> = self
657            .peers()
658            .state
659            .iter()
660            .filter(|kv_pair| kv_pair.value().can_start_sync(now, ibd))
661            .map(|kv_pair| *kv_pair.key())
662            .collect();
663
664        if peers.is_empty() {
665            return;
666        }
667
668        let tip = self.better_tip_header();
669
670        for peer in peers {
671            // Only sync with 1 peer if we're in IBD
672            if self
673                .shared()
674                .state()
675                .n_sync_started()
676                .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
677                    if ibd && x != 0 { None } else { Some(x + 1) }
678                })
679                .is_err()
680            {
681                break;
682            }
683            {
684                if let Some(mut peer_state) = self.peers().state.get_mut(&peer) {
685                    peer_state.start_sync(HeadersSyncController::from_header(&tip));
686                }
687            }
688
689            debug!("Start sync peer={}", peer);
690            active_chain.send_getheaders_to_peer(nc, peer, tip.number_and_hash());
691        }
692    }
693
694    fn get_peers_to_fetch(
695        &self,
696        ibd: IBDState,
697        disconnect_list: &HashSet<PeerIndex>,
698    ) -> Vec<PeerIndex> {
699        trace!("Poll find_blocks_to_fetch selecting peers");
700        let state = &self
701            .shared
702            .state()
703            .read_inflight_blocks()
704            .download_schedulers;
705        let mut peers: Vec<PeerIndex> = self
706            .peers()
707            .state
708            .iter()
709            .filter(|kv_pair| {
710                let (id, state) = kv_pair.pair();
711                if disconnect_list.contains(id) {
712                    return false;
713                };
714                match ibd {
715                    IBDState::In => {
716                        state.peer_flags.is_outbound
717                            || state.peer_flags.is_whitelist
718                            || state.peer_flags.is_protect
719                    }
720                    IBDState::Out => state.started_or_tip_synced(),
721                }
722            })
723            .map(|kv_pair| *kv_pair.key())
724            .collect();
725        peers.sort_by_key(|id| {
726            ::std::cmp::Reverse(
727                state
728                    .get(id)
729                    .map_or(INIT_BLOCKS_IN_TRANSIT_PER_PEER, |d| d.task_count()),
730            )
731        });
732        peers
733    }
734
735    fn find_blocks_to_fetch(&mut self, nc: &Arc<dyn CKBProtocolContext + Sync>, ibd: IBDState) {
736        if self.chain.is_verifying_unverified_blocks_on_startup() {
737            trace!(
738                "skip find_blocks_to_fetch, ckb_chain is verifying unverified blocks on startup"
739            );
740            return;
741        }
742
743        if ckb_stop_handler::has_received_stop_signal() {
744            info!("received stop signal, stop find_blocks_to_fetch");
745            return;
746        }
747
748        let unverified_tip = self.shared.active_chain().unverified_tip_number();
749
750        let disconnect_list = {
751            let mut list = self
752                .shared()
753                .state()
754                .write_inflight_blocks()
755                .prune(unverified_tip);
756            if let IBDState::In = ibd {
757                // best known < tip and in IBD state, and unknown list is empty,
758                // these node can be disconnect
759                list.extend(
760                    self.shared
761                        .state()
762                        .peers()
763                        .get_best_known_less_than_tip_and_unknown_empty(unverified_tip),
764                )
765            };
766            list
767        };
768
769        for peer in disconnect_list.iter() {
770            // It is not forbidden to evict protected nodes:
771            // - First of all, this node is not designated by the user for protection,
772            //   but is connected randomly. It does not represent the will of the user
773            // - Secondly, in the synchronization phase, the nodes with zero download tasks are
774            //   retained, apart from reducing the download efficiency, there is no benefit.
775            if self
776                .peers()
777                .get_flag(*peer)
778                .map(|flag| flag.is_whitelist)
779                .unwrap_or(false)
780            {
781                continue;
782            }
783            let nc = Arc::clone(nc);
784            let peer = *peer;
785            self.shared.shared().async_handle().spawn(async move {
786                let _status = nc.async_disconnect(peer, "sync disconnect").await;
787            });
788        }
789
790        // fetch use a lot of cpu time, especially in ibd state
791        // so, the fetch function use another thread
792        match nc.p2p_control() {
793            Some(raw) => match self.fetch_channel {
794                Some(ref sender) => {
795                    if !sender.is_full() {
796                        let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
797                        let _ignore = sender.try_send(FetchCMD {
798                            peers,
799                            ibd_state: ibd,
800                        });
801                    }
802                }
803                None => {
804                    let p2p_control = raw.clone();
805                    let (sender, recv) = channel::bounded(2);
806                    let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
807                    sender
808                        .send(FetchCMD {
809                            peers,
810                            ibd_state: ibd,
811                        })
812                        .unwrap();
813                    self.fetch_channel = Some(sender);
814                    let thread = ::std::thread::Builder::new();
815                    let number = self.shared.state().shared_best_header_ref().number();
816                    const THREAD_NAME: &str = "BlockDownload";
817                    let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
818                    let blockdownload_jh = thread
819                        .name(THREAD_NAME.into())
820                        .spawn(move || {
821                            let stop_signal = new_crossbeam_exit_rx();
822                            BlockFetchCMD {
823                                sync_shared,
824                                p2p_control,
825                                recv,
826                                number,
827                                can_start: CanStart::MinWorkNotReach,
828                                start_timestamp: unix_time_as_millis(),
829                            }
830                            .run(stop_signal);
831                        })
832                        .expect("download thread can't start");
833                    register_thread(THREAD_NAME, blockdownload_jh);
834                }
835            },
836            None => {
837                for peer in self.get_peers_to_fetch(ibd, &disconnect_list) {
838                    if let Some(fetch) =
839                        tokio::task::block_in_place(|| self.get_blocks_to_fetch(peer, ibd))
840                    {
841                        for item in fetch {
842                            self.send_getblocks(item, nc, peer);
843                        }
844                    }
845                }
846            }
847        }
848    }
849
850    fn send_getblocks(
851        &self,
852        v_fetch: Vec<packed::Byte32>,
853        nc: &Arc<dyn CKBProtocolContext + Sync>,
854        peer: PeerIndex,
855    ) {
856        let content = packed::GetBlocks::new_builder()
857            .block_hashes(v_fetch.clone())
858            .build();
859        let message = packed::SyncMessage::new_builder().set(content).build();
860
861        debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
862        let nc = Arc::clone(nc);
863        self.shared.shared().async_handle().spawn(async move {
864            let _status = async_send_message_to(&nc, peer, &message).await;
865        });
866    }
867}
868
869#[async_trait]
870impl CKBProtocolHandler for Synchronizer {
871    async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
872        // NOTE: 100ms is what bitcoin use.
873        nc.set_notify(SYNC_NOTIFY_INTERVAL, SEND_GET_HEADERS_TOKEN)
874            .await
875            .expect("set_notify at init is ok");
876        nc.set_notify(SYNC_NOTIFY_INTERVAL, TIMEOUT_EVICTION_TOKEN)
877            .await
878            .expect("set_notify at init is ok");
879        nc.set_notify(IBD_BLOCK_FETCH_INTERVAL, IBD_BLOCK_FETCH_TOKEN)
880            .await
881            .expect("set_notify at init is ok");
882        nc.set_notify(NOT_IBD_BLOCK_FETCH_INTERVAL, NOT_IBD_BLOCK_FETCH_TOKEN)
883            .await
884            .expect("set_notify at init is ok");
885        nc.set_notify(Duration::from_secs(2), NO_PEER_CHECK_TOKEN)
886            .await
887            .expect("set_notify at init is ok");
888    }
889
890    async fn received(
891        &mut self,
892        nc: Arc<dyn CKBProtocolContext + Sync>,
893        peer_index: PeerIndex,
894        data: Bytes,
895    ) {
896        let msg = match packed::SyncMessageReader::from_compatible_slice(&data) {
897            Ok(msg) => {
898                let item = msg.to_enum();
899                if let packed::SyncMessageUnionReader::SendBlock(ref reader) = item {
900                    if reader.has_extra_fields() || reader.block().count_extra_fields() > 1 {
901                        info!(
902                            "A malformed message from peer {}: \
903                             excessive fields detected in SendBlock",
904                            peer_index
905                        );
906                        nc.ban_peer(
907                            peer_index,
908                            BAD_MESSAGE_BAN_TIME,
909                            String::from(
910                                "send us a malformed message: \
911                                 too many fields in SendBlock",
912                            ),
913                        );
914                        return;
915                    } else {
916                        item
917                    }
918                } else {
919                    match packed::SyncMessageReader::from_slice(&data) {
920                        Ok(msg) => msg.to_enum(),
921                        _ => {
922                            info!(
923                                "A malformed message from peer {}: \
924                                 excessive fields",
925                                peer_index
926                            );
927                            nc.ban_peer(
928                                peer_index,
929                                BAD_MESSAGE_BAN_TIME,
930                                String::from(
931                                    "send us a malformed message: \
932                                     too many fields",
933                                ),
934                            );
935                            return;
936                        }
937                    }
938                }
939            }
940            _ => {
941                info!("A malformed message from peer {}", peer_index);
942                nc.ban_peer(
943                    peer_index,
944                    BAD_MESSAGE_BAN_TIME,
945                    String::from("send us a malformed message"),
946                );
947                return;
948            }
949        };
950
951        debug!("Received msg {} from {}", msg.item_name(), peer_index);
952        #[cfg(feature = "with_sentry")]
953        {
954            let sentry_hub = sentry::Hub::current();
955            let _scope_guard = sentry_hub.push_scope();
956            sentry_hub.configure_scope(|scope| {
957                scope.set_tag("p2p.protocol", "synchronizer");
958                scope.set_tag("p2p.message", msg.item_name());
959            });
960        }
961
962        let start_time = Instant::now();
963        self.process(nc, peer_index, msg).await;
964        debug!(
965            "Process message={}, peer={}, cost={:?}",
966            msg.item_name(),
967            peer_index,
968            Instant::now().saturating_duration_since(start_time),
969        );
970    }
971
972    async fn connected(
973        &mut self,
974        nc: Arc<dyn CKBProtocolContext + Sync>,
975        peer_index: PeerIndex,
976        _version: &str,
977    ) {
978        info!("SyncProtocol.connected peer={}", peer_index);
979        self.on_connected(nc.as_ref(), peer_index);
980    }
981
982    async fn disconnected(
983        &mut self,
984        _nc: Arc<dyn CKBProtocolContext + Sync>,
985        peer_index: PeerIndex,
986    ) {
987        let sync_state = self.shared().state();
988        sync_state.disconnected(peer_index);
989        info!("SyncProtocol.disconnected peer={}", peer_index);
990    }
991
992    async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
993        if !self.peers().state.is_empty() {
994            let start_time = Instant::now();
995            trace!("Start notify token={}", token);
996            match token {
997                SEND_GET_HEADERS_TOKEN => {
998                    self.start_sync_headers(&nc);
999                }
1000                IBD_BLOCK_FETCH_TOKEN => {
1001                    if self.shared.active_chain().is_initial_block_download() {
1002                        self.find_blocks_to_fetch(&nc, IBDState::In);
1003                    } else {
1004                        {
1005                            self.shared.state().write_inflight_blocks().adjustment = false;
1006                        }
1007                        self.shared.state().peers().clear_unknown_list();
1008                        if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).await.is_err() {
1009                            trace!("Ibd block fetch token removal failed");
1010                        }
1011                    }
1012                }
1013                NOT_IBD_BLOCK_FETCH_TOKEN => {
1014                    if !self.shared.active_chain().is_initial_block_download() {
1015                        self.find_blocks_to_fetch(&nc, IBDState::Out);
1016                    }
1017                }
1018                TIMEOUT_EVICTION_TOKEN => {
1019                    self.eviction(&nc);
1020                }
1021                // Here is just for NO_PEER_CHECK_TOKEN token, only handle it when there is no peer.
1022                _ => {}
1023            }
1024
1025            trace!(
1026                "Finished notify token={} cost={:?}",
1027                token,
1028                Instant::now().saturating_duration_since(start_time)
1029            );
1030        } else if token == NO_PEER_CHECK_TOKEN {
1031            debug!("No peers connected");
1032        }
1033    }
1034}