1mod block_fetcher;
10mod block_process;
11mod get_blocks_process;
12mod get_headers_process;
13mod headers_process;
14mod in_ibd_process;
15
16pub(crate) use self::block_fetcher::BlockFetcher;
17pub(crate) use self::block_process::BlockProcess;
18pub(crate) use self::get_blocks_process::GetBlocksProcess;
19pub(crate) use self::get_headers_process::GetHeadersProcess;
20pub(crate) use self::headers_process::HeadersProcess;
21pub(crate) use self::in_ibd_process::InIBDProcess;
22
23use crate::types::{HeadersSyncController, IBDState, Peers, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, async_send_message_to, metric_ckb_message_bytes};
25use crate::{Status, StatusCode};
26use ckb_shared::block_status::BlockStatus;
27
28use ckb_chain::{ChainController, RemoteBlock};
29use ckb_channel as channel;
30use ckb_channel::{Receiver, select};
31use ckb_constant::sync::{
32 BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME,
33 INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE,
34};
35use ckb_logger::{debug, error, info, trace, warn};
36use ckb_metrics::HistogramTimer;
37use ckb_network::{
38 CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceAsyncControl, ServiceControl,
39 SupportProtocols, async_trait, bytes::Bytes, tokio,
40};
41use ckb_shared::types::HeaderIndexView;
42use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
43use ckb_systemtime::unix_time_as_millis;
44
45#[cfg(test)]
46use ckb_types::core;
47use ckb_types::{
48 core::BlockNumber,
49 packed::{self, Byte32},
50 prelude::*,
51};
52use std::{
53 collections::HashSet,
54 sync::{Arc, atomic::Ordering},
55 time::{Duration, Instant},
56};
57
58pub const SEND_GET_HEADERS_TOKEN: u64 = 0;
59pub const IBD_BLOCK_FETCH_TOKEN: u64 = 1;
60pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
61pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
62pub const NO_PEER_CHECK_TOKEN: u64 = 255;
63
64const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1);
65const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
66const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);
67
68#[derive(Copy, Clone)]
69enum CanStart {
70 FetchToTarget(BlockNumber),
71 Ready,
72 MinWorkNotReach,
73 AssumeValidNotFound,
74}
75
76struct FetchCMD {
77 peers: Vec<PeerIndex>,
78 ibd_state: IBDState,
79}
80
81struct BlockFetchCMD {
82 sync_shared: Arc<SyncShared>,
83 p2p_control: ServiceControl,
84 recv: channel::Receiver<FetchCMD>,
85 can_start: CanStart,
86 number: BlockNumber,
87 start_timestamp: u64,
88}
89
90impl BlockFetchCMD {
91 fn process_fetch_cmd(&mut self, cmd: FetchCMD) {
92 let FetchCMD { peers, ibd_state }: FetchCMD = cmd;
93
94 let fetch_blocks_fn = |cmd: &mut BlockFetchCMD, assume_target: BlockNumber| {
95 for peer in peers {
96 if ckb_stop_handler::has_received_stop_signal() {
97 return;
98 }
99
100 let mut fetch_end: BlockNumber = u64::MAX;
101 if assume_target != 0 {
102 fetch_end = assume_target
103 }
104
105 if let Some(fetch) =
106 BlockFetcher::new(Arc::clone(&cmd.sync_shared), peer, ibd_state)
107 .fetch(fetch_end)
108 {
109 for item in fetch {
110 if ckb_stop_handler::has_received_stop_signal() {
111 return;
112 }
113 let ctrl = cmd.p2p_control.clone();
114 let handle = cmd.sync_shared.shared().async_handle();
115 handle.spawn(BlockFetchCMD::send_getblocks(item, ctrl, peer));
116 }
117 }
118 }
119 };
120
121 match self.can_start() {
122 CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target),
123 CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX),
124 CanStart::MinWorkNotReach => {
125 let best_known = self.sync_shared.state().shared_best_header_ref();
126 let number = best_known.number();
127 if number != self.number && (number - self.number).is_multiple_of(10000) {
128 self.number = number;
129 info!(
130 "The current best known header number: {}, total difficulty: {:#x}. \
131 Block download minimum requirements: header number: 500_000, total difficulty: {:#x}.",
132 number,
133 best_known.total_difficulty(),
134 self.sync_shared.state().min_chain_work()
135 );
136 }
137 }
138 CanStart::AssumeValidNotFound => {
139 let state = self.sync_shared.state();
140 let shared = self.sync_shared.shared();
141 let best_known = state.shared_best_header_ref();
142 let number = best_known.number();
143 let assume_valid_target: Byte32 = shared
144 .assume_valid_targets()
145 .as_ref()
146 .and_then(|targets| targets.first())
147 .map(Pack::pack)
148 .expect("assume valid target must exist");
149
150 if number != self.number && (number - self.number).is_multiple_of(10000) {
151 self.number = number;
152 let remaining_headers_sync_log = self.reaming_headers_sync_log();
153
154 info!(
155 "best known header {}-{}, \
156 CKB is syncing to latest Header to find the assume valid target: {}. \
157 Please wait. {}",
158 number,
159 best_known.hash(),
160 assume_valid_target,
161 remaining_headers_sync_log
162 );
163 }
164 }
165 }
166 }
167
168 fn reaming_headers_sync_log(&self) -> String {
169 match self.calc_time_need_to_reach_latest_tip_header() {
170 Some(remaining) => {
171 let secs = remaining.as_secs();
172 match secs {
173 0 => "Almost synced.".to_string(),
174 1..=59 => format!("Need {} seconds to sync to the latest Header.", secs),
175 60..=3599 => {
176 format!("Need {} minutes to sync to the latest Header.", secs / 60)
177 }
178 _ => {
179 let hours = secs / 3600;
180 let minutes = (secs % 3600) / 60;
181 format!(
182 "Need {} hours {} minutes to sync to the latest Header.",
183 hours, minutes
184 )
185 }
186 }
187 }
188 None => "".to_string(),
189 }
190 }
191
192 fn calc_time_need_to_reach_latest_tip_header(&self) -> Option<Duration> {
204 let genesis_timestamp = self
205 .sync_shared
206 .consensus()
207 .genesis_block()
208 .header()
209 .timestamp();
210 let shared_best_timestamp = self.sync_shared.state().shared_best_header().timestamp();
211
212 let ckb_process_start_timestamp = self.start_timestamp;
213
214 let now_timestamp = unix_time_as_millis();
215
216 let ckb_chain_age = now_timestamp.checked_sub(genesis_timestamp)? as f64;
218 let ckb_process_age = now_timestamp.checked_sub(ckb_process_start_timestamp)? as f64;
219 let has_synced_headers_age = shared_best_timestamp.checked_sub(genesis_timestamp)? as f64;
220
221 if ckb_process_age <= 0.0 || has_synced_headers_age <= 0.0 {
222 return None;
223 }
224
225 let ckb_sync_header_speed = has_synced_headers_age / ckb_process_age;
226 let sync_all_headers_timecost = ckb_chain_age / ckb_sync_header_speed;
227 let sync_remaining_headers_needed = sync_all_headers_timecost - ckb_process_age;
228
229 if sync_remaining_headers_needed <= 0.0 {
230 Some(Duration::from_millis(0))
231 } else {
232 Some(Duration::from_millis(sync_remaining_headers_needed as u64))
233 }
234 }
235
236 fn run(&mut self, stop_signal: Receiver<()>) {
237 loop {
238 select! {
239 recv(self.recv) -> msg => {
240 if let Ok(cmd) = msg {
241 self.process_fetch_cmd(cmd)
242 }
243 }
244 recv(stop_signal) -> _ => {
245 info!("BlockDownload received exit signal, exit now");
246 return;
247 }
248 }
249 }
250 }
251
252 fn can_start(&mut self) -> CanStart {
253 if let CanStart::Ready = self.can_start {
254 return self.can_start;
255 }
256
257 let shared = self.sync_shared.shared();
258 let state = self.sync_shared.state();
259
260 let min_work_reach = |flag: &mut CanStart| {
261 if state.min_chain_work_ready() {
262 *flag = CanStart::AssumeValidNotFound;
263 }
264 };
265
266 let assume_valid_target_find = |flag: &mut CanStart| {
267 let mut assume_valid_targets = shared.assume_valid_targets();
268 if let Some(ref targets) = *assume_valid_targets {
269 if targets.is_empty() {
270 assume_valid_targets.take();
271 *flag = CanStart::Ready;
272 return;
273 }
274 let first_target = targets
275 .first()
276 .expect("has checked targets is not empty, assume valid target must exist");
277 match shared.header_map().get(&first_target.into()) {
278 Some(header) => {
279 if matches!(*flag, CanStart::FetchToTarget(fetch_target) if fetch_target == header.number())
280 {
281 } else {
283 *flag = CanStart::FetchToTarget(header.number());
284 info!(
285 "assume valid target found in header_map; CKB will start fetch blocks to {:?} now",
286 header.number_and_hash()
287 );
288 }
289 if unix_time_as_millis().saturating_sub(header.timestamp()) < MAX_TIP_AGE {
291 assume_valid_targets.take();
292 warn!(
293 "the duration gap between 'assume valid target' and 'now' is less than 24h; CKB will ignore the specified assume valid target and do full verification from now on"
294 );
295 }
296 }
297 None => {
298 if unix_time_as_millis()
300 .saturating_sub(state.shared_best_header_ref().timestamp())
301 < MAX_TIP_AGE
302 {
303 warn!(
304 "the duration gap between 'shared_best_header' and 'now' is less than 24h, but CKB haven't found the assume valid target in header_map; CKB will ignore the specified assume valid target and do full verification from now on"
305 );
306 *flag = CanStart::Ready;
307 assume_valid_targets.take();
308 }
309 }
310 }
311 } else {
312 *flag = CanStart::Ready;
313 }
314 };
315
316 match self.can_start {
317 CanStart::FetchToTarget(_) => {
318 assume_valid_target_find(&mut self.can_start);
319 self.can_start
320 }
321 CanStart::Ready => self.can_start,
322 CanStart::MinWorkNotReach => {
323 min_work_reach(&mut self.can_start);
324 if let CanStart::AssumeValidNotFound = self.can_start {
325 assume_valid_target_find(&mut self.can_start);
326 }
327 self.can_start
328 }
329 CanStart::AssumeValidNotFound => {
330 assume_valid_target_find(&mut self.can_start);
331 self.can_start
332 }
333 }
334 }
335
336 async fn send_getblocks(v_fetch: Vec<packed::Byte32>, nc: ServiceControl, peer: PeerIndex) {
337 let content = packed::GetBlocks::new_builder()
338 .block_hashes(v_fetch.clone())
339 .build();
340 let message = packed::SyncMessage::new_builder().set(content).build();
341
342 debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
343 if let Err(err) = Into::<ServiceAsyncControl>::into(nc)
344 .send_message_to(
345 peer,
346 SupportProtocols::Sync.protocol_id(),
347 message.as_bytes(),
348 )
349 .await
350 {
351 debug!("synchronizer sending GetBlocks error: {:?}", err);
352 }
353 }
354}
355
356pub struct Synchronizer {
358 pub(crate) chain: ChainController,
359 pub shared: Arc<SyncShared>,
361 fetch_channel: Option<channel::Sender<FetchCMD>>,
362}
363
364impl Synchronizer {
365 pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
369 Synchronizer {
370 chain,
371 shared,
372 fetch_channel: None,
373 }
374 }
375
376 pub fn shared(&self) -> &Arc<SyncShared> {
378 &self.shared
379 }
380
381 async fn try_process(
382 &self,
383 nc: Arc<dyn CKBProtocolContext + Sync>,
384 peer: PeerIndex,
385 message: packed::SyncMessageUnionReader<'_>,
386 ) -> Status {
387 let _trace_timecost: Option<HistogramTimer> = {
388 ckb_metrics::handle().map(|handle| {
389 handle
390 .ckb_sync_msg_process_duration
391 .with_label_values(&[message.item_name()])
392 .start_timer()
393 })
394 };
395
396 match message {
397 packed::SyncMessageUnionReader::GetHeaders(reader) => {
398 tokio::task::block_in_place(|| {
399 GetHeadersProcess::new(reader, self, peer, &nc).execute()
400 })
401 }
402 packed::SyncMessageUnionReader::SendHeaders(reader) => {
403 tokio::task::block_in_place(|| {
404 HeadersProcess::new(reader, self, peer, &nc).execute()
405 })
406 }
407 packed::SyncMessageUnionReader::GetBlocks(reader) => {
408 tokio::task::block_in_place(|| {
409 GetBlocksProcess::new(reader, self, peer, &nc).execute()
410 })
411 }
412 packed::SyncMessageUnionReader::SendBlock(reader) => {
413 if reader.check_data() {
414 BlockProcess::new(reader, self, peer, nc).execute()
415 } else {
416 StatusCode::ProtocolMessageIsMalformed.with_context("SendBlock is invalid")
417 }
418 }
419 packed::SyncMessageUnionReader::InIBD(_) => {
420 InIBDProcess::new(self, peer, &nc).execute().await
421 }
422 }
423 }
424
425 async fn process(
426 &self,
427 nc: Arc<dyn CKBProtocolContext + Sync>,
428 peer: PeerIndex,
429 message: packed::SyncMessageUnionReader<'_>,
430 ) {
431 let item_name = message.item_name();
432 let item_bytes = message.as_slice().len() as u64;
433 let status = self.try_process(Arc::clone(&nc), peer, message).await;
434
435 metric_ckb_message_bytes(
436 MetricDirection::In,
437 &SupportProtocols::Sync.name(),
438 item_name,
439 Some(status.code()),
440 item_bytes,
441 );
442
443 post_sync_process(nc.as_ref(), peer, item_name, status);
444 }
445
446 pub fn peers(&self) -> &Peers {
448 self.shared().state().peers()
449 }
450
451 fn better_tip_header(&self) -> HeaderIndexView {
452 let (header, total_difficulty) = {
453 let active_chain = self.shared.active_chain();
454 (
455 active_chain.tip_header(),
456 active_chain.total_difficulty().to_owned(),
457 )
458 };
459 let best_known = self.shared.state().shared_best_header();
460 if total_difficulty > *best_known.total_difficulty() {
462 (header, total_difficulty).into()
463 } else {
464 best_known
465 }
466 }
467
468 pub fn asynchronous_process_remote_block(&self, remote_block: RemoteBlock) {
471 let block_hash = remote_block.block.hash();
472 let status = self.shared.active_chain().get_block_status(&block_hash);
473 if status.contains(BlockStatus::BLOCK_STORED) {
476 error!("Block {} already stored", block_hash);
477 } else if status.contains(BlockStatus::HEADER_VALID) {
478 self.shared.accept_remote_block(&self.chain, remote_block);
479 } else {
480 debug!(
481 "Synchronizer process_new_block unexpected status {:?} {}",
482 status, block_hash,
483 );
484 }
486 }
487
488 #[cfg(test)]
490 pub fn blocking_process_new_block(
491 &self,
492 block: core::BlockView,
493 _peer_id: PeerIndex,
494 ) -> Result<bool, ckb_error::Error> {
495 let block_hash = block.hash();
496 let status = self.shared.active_chain().get_block_status(&block_hash);
497 if status.contains(BlockStatus::BLOCK_STORED) {
500 error!("block {} already stored", block_hash);
501 Ok(false)
502 } else if status.contains(BlockStatus::HEADER_VALID) {
503 self.chain.blocking_process_block(Arc::new(block))
504 } else {
505 debug!(
506 "Synchronizer process_new_block unexpected status {:?} {}",
507 status, block_hash,
508 );
509 Ok(false)
511 }
512 }
513
514 pub fn get_blocks_to_fetch(
516 &self,
517 peer: PeerIndex,
518 ibd: IBDState,
519 ) -> Option<Vec<Vec<packed::Byte32>>> {
520 BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch(BlockNumber::MAX)
521 }
522
523 pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
524 let pid = SupportProtocols::Sync.protocol_id();
525 let (is_outbound, is_whitelist, is_2023edition) = nc
526 .get_peer(peer)
527 .map(|peer| {
528 (
529 peer.is_outbound(),
530 peer.is_whitelist,
531 peer.protocols.get(&pid).map(|v| v == "3").unwrap_or(false),
532 )
533 })
534 .unwrap_or((false, false, false));
535
536 self.peers()
537 .sync_connected(peer, is_outbound, is_whitelist, is_2023edition);
538 }
539
540 pub fn eviction(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
550 let active_chain = self.shared.active_chain();
551 let mut eviction = Vec::new();
552 let better_tip_header = self.better_tip_header();
553 for mut kv_pair in self.peers().state.iter_mut() {
554 let (peer, state) = kv_pair.pair_mut();
555 let now = unix_time_as_millis();
556
557 if let Some(ref mut controller) = state.headers_sync_controller {
558 let better_tip_ts = better_tip_header.timestamp();
559 if let Some(is_timeout) = controller.is_timeout(better_tip_ts, now) {
560 if is_timeout {
561 eviction.push(*peer);
562 continue;
563 }
564 } else {
565 active_chain.send_getheaders_to_peer(
566 nc,
567 *peer,
568 better_tip_header.number_and_hash(),
569 );
570 }
571 }
572
573 if active_chain.is_initial_block_download() {
580 continue;
581 }
582 if state.peer_flags.is_outbound {
583 let best_known_header = state.best_known_header.as_ref();
584 let (tip_header, local_total_difficulty) = {
585 (
586 active_chain.tip_header().to_owned(),
587 active_chain.total_difficulty().to_owned(),
588 )
589 };
590 if best_known_header
591 .map(|header_index| header_index.total_difficulty().clone())
592 .unwrap_or_default()
593 >= local_total_difficulty
594 {
595 if state.chain_sync.timeout != 0 {
596 state.chain_sync.timeout = 0;
597 state.chain_sync.work_header = None;
598 state.chain_sync.total_difficulty = None;
599 state.chain_sync.sent_getheaders = false;
600 }
601 } else if state.chain_sync.timeout == 0
602 || (best_known_header.is_some()
603 && best_known_header
604 .map(|header_index| header_index.total_difficulty().clone())
605 >= state.chain_sync.total_difficulty)
606 {
607 state.chain_sync.timeout = now + CHAIN_SYNC_TIMEOUT;
612 state.chain_sync.work_header = Some(tip_header);
613 state.chain_sync.total_difficulty = Some(local_total_difficulty);
614 state.chain_sync.sent_getheaders = false;
615 } else if state.chain_sync.timeout > 0 && now > state.chain_sync.timeout {
616 if state.chain_sync.sent_getheaders {
620 if state.peer_flags.is_protect || state.peer_flags.is_whitelist {
621 if state.sync_started() {
622 self.shared().state().suspend_sync(state);
623 }
624 } else {
625 eviction.push(*peer);
626 }
627 } else {
628 state.chain_sync.sent_getheaders = true;
629 state.chain_sync.timeout = now + EVICTION_HEADERS_RESPONSE_TIME;
630 active_chain.send_getheaders_to_peer(
631 nc,
632 *peer,
633 state
634 .chain_sync
635 .work_header
636 .as_ref()
637 .expect("work_header be assigned")
638 .into(),
639 );
640 }
641 }
642 }
643 }
644 for peer in eviction {
645 info!("Timeout eviction peer={}", peer);
646 if let Err(err) = nc.disconnect(peer, "sync timeout eviction") {
647 debug!("synchronizer disconnect error: {:?}", err);
648 }
649 }
650 }
651
652 fn start_sync_headers(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
653 let now = unix_time_as_millis();
654 let active_chain = self.shared.active_chain();
655 let ibd = active_chain.is_initial_block_download();
656 let peers: Vec<PeerIndex> = self
657 .peers()
658 .state
659 .iter()
660 .filter(|kv_pair| kv_pair.value().can_start_sync(now, ibd))
661 .map(|kv_pair| *kv_pair.key())
662 .collect();
663
664 if peers.is_empty() {
665 return;
666 }
667
668 let tip = self.better_tip_header();
669
670 for peer in peers {
671 if self
673 .shared()
674 .state()
675 .n_sync_started()
676 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
677 if ibd && x != 0 { None } else { Some(x + 1) }
678 })
679 .is_err()
680 {
681 break;
682 }
683 {
684 if let Some(mut peer_state) = self.peers().state.get_mut(&peer) {
685 peer_state.start_sync(HeadersSyncController::from_header(&tip));
686 }
687 }
688
689 debug!("Start sync peer={}", peer);
690 active_chain.send_getheaders_to_peer(nc, peer, tip.number_and_hash());
691 }
692 }
693
694 fn get_peers_to_fetch(
695 &self,
696 ibd: IBDState,
697 disconnect_list: &HashSet<PeerIndex>,
698 ) -> Vec<PeerIndex> {
699 trace!("Poll find_blocks_to_fetch selecting peers");
700 let state = &self
701 .shared
702 .state()
703 .read_inflight_blocks()
704 .download_schedulers;
705 let mut peers: Vec<PeerIndex> = self
706 .peers()
707 .state
708 .iter()
709 .filter(|kv_pair| {
710 let (id, state) = kv_pair.pair();
711 if disconnect_list.contains(id) {
712 return false;
713 };
714 match ibd {
715 IBDState::In => {
716 state.peer_flags.is_outbound
717 || state.peer_flags.is_whitelist
718 || state.peer_flags.is_protect
719 }
720 IBDState::Out => state.started_or_tip_synced(),
721 }
722 })
723 .map(|kv_pair| *kv_pair.key())
724 .collect();
725 peers.sort_by_key(|id| {
726 ::std::cmp::Reverse(
727 state
728 .get(id)
729 .map_or(INIT_BLOCKS_IN_TRANSIT_PER_PEER, |d| d.task_count()),
730 )
731 });
732 peers
733 }
734
735 fn find_blocks_to_fetch(&mut self, nc: &Arc<dyn CKBProtocolContext + Sync>, ibd: IBDState) {
736 if self.chain.is_verifying_unverified_blocks_on_startup() {
737 trace!(
738 "skip find_blocks_to_fetch, ckb_chain is verifying unverified blocks on startup"
739 );
740 return;
741 }
742
743 if ckb_stop_handler::has_received_stop_signal() {
744 info!("received stop signal, stop find_blocks_to_fetch");
745 return;
746 }
747
748 let unverified_tip = self.shared.active_chain().unverified_tip_number();
749
750 let disconnect_list = {
751 let mut list = self
752 .shared()
753 .state()
754 .write_inflight_blocks()
755 .prune(unverified_tip);
756 if let IBDState::In = ibd {
757 list.extend(
760 self.shared
761 .state()
762 .peers()
763 .get_best_known_less_than_tip_and_unknown_empty(unverified_tip),
764 )
765 };
766 list
767 };
768
769 for peer in disconnect_list.iter() {
770 if self
776 .peers()
777 .get_flag(*peer)
778 .map(|flag| flag.is_whitelist)
779 .unwrap_or(false)
780 {
781 continue;
782 }
783 let nc = Arc::clone(nc);
784 let peer = *peer;
785 self.shared.shared().async_handle().spawn(async move {
786 let _status = nc.async_disconnect(peer, "sync disconnect").await;
787 });
788 }
789
790 match nc.p2p_control() {
793 Some(raw) => match self.fetch_channel {
794 Some(ref sender) => {
795 if !sender.is_full() {
796 let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
797 let _ignore = sender.try_send(FetchCMD {
798 peers,
799 ibd_state: ibd,
800 });
801 }
802 }
803 None => {
804 let p2p_control = raw.clone();
805 let (sender, recv) = channel::bounded(2);
806 let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
807 sender
808 .send(FetchCMD {
809 peers,
810 ibd_state: ibd,
811 })
812 .unwrap();
813 self.fetch_channel = Some(sender);
814 let thread = ::std::thread::Builder::new();
815 let number = self.shared.state().shared_best_header_ref().number();
816 const THREAD_NAME: &str = "BlockDownload";
817 let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
818 let blockdownload_jh = thread
819 .name(THREAD_NAME.into())
820 .spawn(move || {
821 let stop_signal = new_crossbeam_exit_rx();
822 BlockFetchCMD {
823 sync_shared,
824 p2p_control,
825 recv,
826 number,
827 can_start: CanStart::MinWorkNotReach,
828 start_timestamp: unix_time_as_millis(),
829 }
830 .run(stop_signal);
831 })
832 .expect("download thread can't start");
833 register_thread(THREAD_NAME, blockdownload_jh);
834 }
835 },
836 None => {
837 for peer in self.get_peers_to_fetch(ibd, &disconnect_list) {
838 if let Some(fetch) =
839 tokio::task::block_in_place(|| self.get_blocks_to_fetch(peer, ibd))
840 {
841 for item in fetch {
842 self.send_getblocks(item, nc, peer);
843 }
844 }
845 }
846 }
847 }
848 }
849
850 fn send_getblocks(
851 &self,
852 v_fetch: Vec<packed::Byte32>,
853 nc: &Arc<dyn CKBProtocolContext + Sync>,
854 peer: PeerIndex,
855 ) {
856 let content = packed::GetBlocks::new_builder()
857 .block_hashes(v_fetch.clone())
858 .build();
859 let message = packed::SyncMessage::new_builder().set(content).build();
860
861 debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
862 let nc = Arc::clone(nc);
863 self.shared.shared().async_handle().spawn(async move {
864 let _status = async_send_message_to(&nc, peer, &message).await;
865 });
866 }
867}
868
869#[async_trait]
870impl CKBProtocolHandler for Synchronizer {
871 async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
872 nc.set_notify(SYNC_NOTIFY_INTERVAL, SEND_GET_HEADERS_TOKEN)
874 .await
875 .expect("set_notify at init is ok");
876 nc.set_notify(SYNC_NOTIFY_INTERVAL, TIMEOUT_EVICTION_TOKEN)
877 .await
878 .expect("set_notify at init is ok");
879 nc.set_notify(IBD_BLOCK_FETCH_INTERVAL, IBD_BLOCK_FETCH_TOKEN)
880 .await
881 .expect("set_notify at init is ok");
882 nc.set_notify(NOT_IBD_BLOCK_FETCH_INTERVAL, NOT_IBD_BLOCK_FETCH_TOKEN)
883 .await
884 .expect("set_notify at init is ok");
885 nc.set_notify(Duration::from_secs(2), NO_PEER_CHECK_TOKEN)
886 .await
887 .expect("set_notify at init is ok");
888 }
889
890 async fn received(
891 &mut self,
892 nc: Arc<dyn CKBProtocolContext + Sync>,
893 peer_index: PeerIndex,
894 data: Bytes,
895 ) {
896 let msg = match packed::SyncMessageReader::from_compatible_slice(&data) {
897 Ok(msg) => {
898 let item = msg.to_enum();
899 if let packed::SyncMessageUnionReader::SendBlock(ref reader) = item {
900 if reader.has_extra_fields() || reader.block().count_extra_fields() > 1 {
901 info!(
902 "A malformed message from peer {}: \
903 excessive fields detected in SendBlock",
904 peer_index
905 );
906 nc.ban_peer(
907 peer_index,
908 BAD_MESSAGE_BAN_TIME,
909 String::from(
910 "send us a malformed message: \
911 too many fields in SendBlock",
912 ),
913 );
914 return;
915 } else {
916 item
917 }
918 } else {
919 match packed::SyncMessageReader::from_slice(&data) {
920 Ok(msg) => msg.to_enum(),
921 _ => {
922 info!(
923 "A malformed message from peer {}: \
924 excessive fields",
925 peer_index
926 );
927 nc.ban_peer(
928 peer_index,
929 BAD_MESSAGE_BAN_TIME,
930 String::from(
931 "send us a malformed message: \
932 too many fields",
933 ),
934 );
935 return;
936 }
937 }
938 }
939 }
940 _ => {
941 info!("A malformed message from peer {}", peer_index);
942 nc.ban_peer(
943 peer_index,
944 BAD_MESSAGE_BAN_TIME,
945 String::from("send us a malformed message"),
946 );
947 return;
948 }
949 };
950
951 debug!("Received msg {} from {}", msg.item_name(), peer_index);
952 #[cfg(feature = "with_sentry")]
953 {
954 let sentry_hub = sentry::Hub::current();
955 let _scope_guard = sentry_hub.push_scope();
956 sentry_hub.configure_scope(|scope| {
957 scope.set_tag("p2p.protocol", "synchronizer");
958 scope.set_tag("p2p.message", msg.item_name());
959 });
960 }
961
962 let start_time = Instant::now();
963 self.process(nc, peer_index, msg).await;
964 debug!(
965 "Process message={}, peer={}, cost={:?}",
966 msg.item_name(),
967 peer_index,
968 Instant::now().saturating_duration_since(start_time),
969 );
970 }
971
972 async fn connected(
973 &mut self,
974 nc: Arc<dyn CKBProtocolContext + Sync>,
975 peer_index: PeerIndex,
976 _version: &str,
977 ) {
978 info!("SyncProtocol.connected peer={}", peer_index);
979 self.on_connected(nc.as_ref(), peer_index);
980 }
981
982 async fn disconnected(
983 &mut self,
984 _nc: Arc<dyn CKBProtocolContext + Sync>,
985 peer_index: PeerIndex,
986 ) {
987 let sync_state = self.shared().state();
988 sync_state.disconnected(peer_index);
989 info!("SyncProtocol.disconnected peer={}", peer_index);
990 }
991
992 async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
993 if !self.peers().state.is_empty() {
994 let start_time = Instant::now();
995 trace!("Start notify token={}", token);
996 match token {
997 SEND_GET_HEADERS_TOKEN => {
998 self.start_sync_headers(&nc);
999 }
1000 IBD_BLOCK_FETCH_TOKEN => {
1001 if self.shared.active_chain().is_initial_block_download() {
1002 self.find_blocks_to_fetch(&nc, IBDState::In);
1003 } else {
1004 {
1005 self.shared.state().write_inflight_blocks().adjustment = false;
1006 }
1007 self.shared.state().peers().clear_unknown_list();
1008 if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).await.is_err() {
1009 trace!("Ibd block fetch token removal failed");
1010 }
1011 }
1012 }
1013 NOT_IBD_BLOCK_FETCH_TOKEN => {
1014 if !self.shared.active_chain().is_initial_block_download() {
1015 self.find_blocks_to_fetch(&nc, IBDState::Out);
1016 }
1017 }
1018 TIMEOUT_EVICTION_TOKEN => {
1019 self.eviction(&nc);
1020 }
1021 _ => {}
1023 }
1024
1025 trace!(
1026 "Finished notify token={} cost={:?}",
1027 token,
1028 Instant::now().saturating_duration_since(start_time)
1029 );
1030 } else if token == NO_PEER_CHECK_TOKEN {
1031 debug!("No peers connected");
1032 }
1033 }
1034}