1mod block_fetcher;
10mod block_process;
11mod get_blocks_process;
12mod get_headers_process;
13mod headers_process;
14mod in_ibd_process;
15
16pub(crate) use self::block_fetcher::BlockFetcher;
17pub(crate) use self::block_process::BlockProcess;
18pub(crate) use self::get_blocks_process::GetBlocksProcess;
19pub(crate) use self::get_headers_process::GetHeadersProcess;
20pub(crate) use self::headers_process::HeadersProcess;
21pub(crate) use self::in_ibd_process::InIBDProcess;
22
23use crate::types::{HeadersSyncController, IBDState, Peers, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, async_send_message_to, metric_ckb_message_bytes};
25use crate::{Status, StatusCode};
26use ckb_shared::block_status::BlockStatus;
27
28use ckb_chain::{ChainController, RemoteBlock};
29use ckb_channel as channel;
30use ckb_channel::{Receiver, select};
31use ckb_constant::sync::{
32 BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME,
33 INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE,
34};
35use ckb_logger::{debug, error, info, trace, warn};
36use ckb_metrics::HistogramTimer;
37use ckb_network::{
38 CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceAsyncControl, ServiceControl,
39 SupportProtocols, async_trait, bytes::Bytes, tokio,
40};
41use ckb_shared::types::HeaderIndexView;
42use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
43use ckb_systemtime::unix_time_as_millis;
44
45#[cfg(test)]
46use ckb_types::core;
47use ckb_types::{
48 core::BlockNumber,
49 packed::{self, Byte32},
50 prelude::*,
51};
52use std::{
53 collections::HashSet,
54 sync::{Arc, atomic::Ordering},
55 time::{Duration, Instant},
56};
57
58pub const SEND_GET_HEADERS_TOKEN: u64 = 0;
59pub const IBD_BLOCK_FETCH_TOKEN: u64 = 1;
60pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
61pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
62pub const NO_PEER_CHECK_TOKEN: u64 = 255;
63
64const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1);
65const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
66const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);
67
68#[derive(Copy, Clone)]
69enum CanStart {
70 FetchToTarget(BlockNumber),
71 Ready,
72 MinWorkNotReach,
73 AssumeValidNotFound,
74}
75
76struct FetchCMD {
77 peers: Vec<PeerIndex>,
78 ibd_state: IBDState,
79}
80
81struct BlockFetchCMD {
82 sync_shared: Arc<SyncShared>,
83 p2p_control: ServiceControl,
84 recv: channel::Receiver<FetchCMD>,
85 can_start: CanStart,
86 number: BlockNumber,
87 start_timestamp: u64,
88}
89
90impl BlockFetchCMD {
91 fn process_fetch_cmd(&mut self, cmd: FetchCMD) {
92 let FetchCMD { peers, ibd_state }: FetchCMD = cmd;
93
94 let fetch_blocks_fn = |cmd: &mut BlockFetchCMD, assume_target: BlockNumber| {
95 for peer in peers {
96 if ckb_stop_handler::has_received_stop_signal() {
97 return;
98 }
99
100 let mut fetch_end: BlockNumber = u64::MAX;
101 if assume_target != 0 {
102 fetch_end = assume_target
103 }
104
105 if let Some(fetch) =
106 BlockFetcher::new(Arc::clone(&cmd.sync_shared), peer, ibd_state)
107 .fetch(fetch_end)
108 {
109 for item in fetch {
110 if ckb_stop_handler::has_received_stop_signal() {
111 return;
112 }
113 let ctrl = cmd.p2p_control.clone();
114 let handle = cmd.sync_shared.shared().async_handle();
115 handle.spawn(BlockFetchCMD::send_getblocks(item, ctrl, peer));
116 }
117 }
118 }
119 };
120
121 match self.can_start() {
122 CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target),
123 CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX),
124 CanStart::MinWorkNotReach => {
125 let best_known = self.sync_shared.state().shared_best_header_ref();
126 let number = best_known.number();
127 if number != self.number && (number - self.number) % 10000 == 0 {
128 self.number = number;
129 info!(
130 "The current best known header number: {}, total difficulty: {:#x}. \
131 Block download minimum requirements: header number: 500_000, total difficulty: {:#x}.",
132 number,
133 best_known.total_difficulty(),
134 self.sync_shared.state().min_chain_work()
135 );
136 }
137 }
138 CanStart::AssumeValidNotFound => {
139 let state = self.sync_shared.state();
140 let shared = self.sync_shared.shared();
141 let best_known = state.shared_best_header_ref();
142 let number = best_known.number();
143 let assume_valid_target: Byte32 = shared
144 .assume_valid_targets()
145 .as_ref()
146 .and_then(|targets| targets.first())
147 .map(Pack::pack)
148 .expect("assume valid target must exist");
149
150 if number != self.number && (number - self.number) % 10000 == 0 {
151 self.number = number;
152 let remaining_headers_sync_log = self.reaming_headers_sync_log();
153
154 info!(
155 "best known header {}-{}, \
156 CKB is syncing to latest Header to find the assume valid target: {}. \
157 Please wait. {}",
158 number,
159 best_known.hash(),
160 assume_valid_target,
161 remaining_headers_sync_log
162 );
163 }
164 }
165 }
166 }
167
168 fn reaming_headers_sync_log(&self) -> String {
169 if let Some(remaining_headers_needed) = self.calc_time_need_to_reach_latest_tip_header() {
170 format!(
171 "Need {} minutes to sync to the latest Header.",
172 remaining_headers_needed.as_secs() / 60
173 )
174 } else {
175 "".to_string()
176 }
177 }
178
179 fn calc_time_need_to_reach_latest_tip_header(&self) -> Option<Duration> {
191 let genesis_timestamp = self
192 .sync_shared
193 .consensus()
194 .genesis_block()
195 .header()
196 .timestamp();
197 let shared_best_timestamp = self.sync_shared.state().shared_best_header().timestamp();
198
199 let ckb_process_start_timestamp = self.start_timestamp;
200
201 let now_timestamp = unix_time_as_millis();
202
203 let ckb_chain_age = now_timestamp.checked_sub(genesis_timestamp)?;
204
205 let ckb_process_age = now_timestamp.checked_sub(ckb_process_start_timestamp)?;
206
207 let has_synced_headers_age = shared_best_timestamp.checked_sub(genesis_timestamp)?;
208
209 let ckb_sync_header_speed = has_synced_headers_age.checked_div(ckb_process_age)?;
210
211 let sync_all_headers_timecost = ckb_chain_age.checked_div(ckb_sync_header_speed)?;
212
213 let sync_remaining_headers_needed =
214 sync_all_headers_timecost.checked_sub(ckb_process_age)?;
215
216 Some(Duration::from_millis(sync_remaining_headers_needed))
217 }
218
219 fn run(&mut self, stop_signal: Receiver<()>) {
220 loop {
221 select! {
222 recv(self.recv) -> msg => {
223 if let Ok(cmd) = msg {
224 self.process_fetch_cmd(cmd)
225 }
226 }
227 recv(stop_signal) -> _ => {
228 info!("BlockDownload received exit signal, exit now");
229 return;
230 }
231 }
232 }
233 }
234
235 fn can_start(&mut self) -> CanStart {
236 if let CanStart::Ready = self.can_start {
237 return self.can_start;
238 }
239
240 let shared = self.sync_shared.shared();
241 let state = self.sync_shared.state();
242
243 let min_work_reach = |flag: &mut CanStart| {
244 if state.min_chain_work_ready() {
245 *flag = CanStart::AssumeValidNotFound;
246 }
247 };
248
249 let assume_valid_target_find = |flag: &mut CanStart| {
250 let mut assume_valid_targets = shared.assume_valid_targets();
251 if let Some(ref targets) = *assume_valid_targets {
252 if targets.is_empty() {
253 assume_valid_targets.take();
254 *flag = CanStart::Ready;
255 return;
256 }
257 let first_target = targets
258 .first()
259 .expect("has checked targets is not empty, assume valid target must exist");
260 match shared.header_map().get(&first_target.into()) {
261 Some(header) => {
262 if matches!(*flag, CanStart::FetchToTarget(fetch_target) if fetch_target == header.number())
263 {
264 } else {
266 *flag = CanStart::FetchToTarget(header.number());
267 info!(
268 "assume valid target found in header_map; CKB will start fetch blocks to {:?} now",
269 header.number_and_hash()
270 );
271 }
272 if unix_time_as_millis().saturating_sub(header.timestamp()) < MAX_TIP_AGE {
274 assume_valid_targets.take();
275 warn!(
276 "the duration gap between 'assume valid target' and 'now' is less than 24h; CKB will ignore the specified assume valid target and do full verification from now on"
277 );
278 }
279 }
280 None => {
281 if unix_time_as_millis()
283 .saturating_sub(state.shared_best_header_ref().timestamp())
284 < MAX_TIP_AGE
285 {
286 warn!(
287 "the duration gap between 'shared_best_header' and 'now' is less than 24h, but CKB haven't found the assume valid target in header_map; CKB will ignore the specified assume valid target and do full verification from now on"
288 );
289 *flag = CanStart::Ready;
290 assume_valid_targets.take();
291 }
292 }
293 }
294 } else {
295 *flag = CanStart::Ready;
296 }
297 };
298
299 match self.can_start {
300 CanStart::FetchToTarget(_) => {
301 assume_valid_target_find(&mut self.can_start);
302 self.can_start
303 }
304 CanStart::Ready => self.can_start,
305 CanStart::MinWorkNotReach => {
306 min_work_reach(&mut self.can_start);
307 if let CanStart::AssumeValidNotFound = self.can_start {
308 assume_valid_target_find(&mut self.can_start);
309 }
310 self.can_start
311 }
312 CanStart::AssumeValidNotFound => {
313 assume_valid_target_find(&mut self.can_start);
314 self.can_start
315 }
316 }
317 }
318
319 async fn send_getblocks(v_fetch: Vec<packed::Byte32>, nc: ServiceControl, peer: PeerIndex) {
320 let content = packed::GetBlocks::new_builder()
321 .block_hashes(v_fetch.clone())
322 .build();
323 let message = packed::SyncMessage::new_builder().set(content).build();
324
325 debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
326 if let Err(err) = Into::<ServiceAsyncControl>::into(nc)
327 .send_message_to(
328 peer,
329 SupportProtocols::Sync.protocol_id(),
330 message.as_bytes(),
331 )
332 .await
333 {
334 debug!("synchronizer sending GetBlocks error: {:?}", err);
335 }
336 }
337}
338
339pub struct Synchronizer {
341 pub(crate) chain: ChainController,
342 pub shared: Arc<SyncShared>,
344 fetch_channel: Option<channel::Sender<FetchCMD>>,
345}
346
347impl Synchronizer {
348 pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
352 Synchronizer {
353 chain,
354 shared,
355 fetch_channel: None,
356 }
357 }
358
359 pub fn shared(&self) -> &Arc<SyncShared> {
361 &self.shared
362 }
363
364 async fn try_process(
365 &self,
366 nc: Arc<dyn CKBProtocolContext + Sync>,
367 peer: PeerIndex,
368 message: packed::SyncMessageUnionReader<'_>,
369 ) -> Status {
370 let _trace_timecost: Option<HistogramTimer> = {
371 ckb_metrics::handle().map(|handle| {
372 handle
373 .ckb_sync_msg_process_duration
374 .with_label_values(&[message.item_name()])
375 .start_timer()
376 })
377 };
378
379 match message {
380 packed::SyncMessageUnionReader::GetHeaders(reader) => {
381 tokio::task::block_in_place(|| {
382 GetHeadersProcess::new(reader, self, peer, &nc).execute()
383 })
384 }
385 packed::SyncMessageUnionReader::SendHeaders(reader) => {
386 tokio::task::block_in_place(|| {
387 HeadersProcess::new(reader, self, peer, &nc).execute()
388 })
389 }
390 packed::SyncMessageUnionReader::GetBlocks(reader) => {
391 tokio::task::block_in_place(|| {
392 GetBlocksProcess::new(reader, self, peer, &nc).execute()
393 })
394 }
395 packed::SyncMessageUnionReader::SendBlock(reader) => {
396 if reader.check_data() {
397 BlockProcess::new(reader, self, peer, nc).execute()
398 } else {
399 StatusCode::ProtocolMessageIsMalformed.with_context("SendBlock is invalid")
400 }
401 }
402 packed::SyncMessageUnionReader::InIBD(_) => {
403 InIBDProcess::new(self, peer, &nc).execute().await
404 }
405 }
406 }
407
408 async fn process(
409 &self,
410 nc: Arc<dyn CKBProtocolContext + Sync>,
411 peer: PeerIndex,
412 message: packed::SyncMessageUnionReader<'_>,
413 ) {
414 let item_name = message.item_name();
415 let item_bytes = message.as_slice().len() as u64;
416 let status = self.try_process(Arc::clone(&nc), peer, message).await;
417
418 metric_ckb_message_bytes(
419 MetricDirection::In,
420 &SupportProtocols::Sync.name(),
421 item_name,
422 Some(status.code()),
423 item_bytes,
424 );
425
426 post_sync_process(nc.as_ref(), peer, item_name, status);
427 }
428
429 pub fn peers(&self) -> &Peers {
431 self.shared().state().peers()
432 }
433
434 fn better_tip_header(&self) -> HeaderIndexView {
435 let (header, total_difficulty) = {
436 let active_chain = self.shared.active_chain();
437 (
438 active_chain.tip_header(),
439 active_chain.total_difficulty().to_owned(),
440 )
441 };
442 let best_known = self.shared.state().shared_best_header();
443 if total_difficulty > *best_known.total_difficulty() {
445 (header, total_difficulty).into()
446 } else {
447 best_known
448 }
449 }
450
451 pub fn asynchronous_process_remote_block(&self, remote_block: RemoteBlock) {
454 let block_hash = remote_block.block.hash();
455 let status = self.shared.active_chain().get_block_status(&block_hash);
456 if status.contains(BlockStatus::BLOCK_STORED) {
459 error!("Block {} already stored", block_hash);
460 } else if status.contains(BlockStatus::HEADER_VALID) {
461 self.shared.accept_remote_block(&self.chain, remote_block);
462 } else {
463 debug!(
464 "Synchronizer process_new_block unexpected status {:?} {}",
465 status, block_hash,
466 );
467 }
469 }
470
471 #[cfg(test)]
473 pub fn blocking_process_new_block(
474 &self,
475 block: core::BlockView,
476 _peer_id: PeerIndex,
477 ) -> Result<bool, ckb_error::Error> {
478 let block_hash = block.hash();
479 let status = self.shared.active_chain().get_block_status(&block_hash);
480 if status.contains(BlockStatus::BLOCK_STORED) {
483 error!("block {} already stored", block_hash);
484 Ok(false)
485 } else if status.contains(BlockStatus::HEADER_VALID) {
486 self.chain.blocking_process_block(Arc::new(block))
487 } else {
488 debug!(
489 "Synchronizer process_new_block unexpected status {:?} {}",
490 status, block_hash,
491 );
492 Ok(false)
494 }
495 }
496
497 pub fn get_blocks_to_fetch(
499 &self,
500 peer: PeerIndex,
501 ibd: IBDState,
502 ) -> Option<Vec<Vec<packed::Byte32>>> {
503 BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch(BlockNumber::MAX)
504 }
505
506 pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
507 let pid = SupportProtocols::Sync.protocol_id();
508 let (is_outbound, is_whitelist, is_2023edition) = nc
509 .get_peer(peer)
510 .map(|peer| {
511 (
512 peer.is_outbound(),
513 peer.is_whitelist,
514 peer.protocols.get(&pid).map(|v| v == "3").unwrap_or(false),
515 )
516 })
517 .unwrap_or((false, false, false));
518
519 self.peers()
520 .sync_connected(peer, is_outbound, is_whitelist, is_2023edition);
521 }
522
523 pub fn eviction(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
533 let active_chain = self.shared.active_chain();
534 let mut eviction = Vec::new();
535 let better_tip_header = self.better_tip_header();
536 for mut kv_pair in self.peers().state.iter_mut() {
537 let (peer, state) = kv_pair.pair_mut();
538 let now = unix_time_as_millis();
539
540 if let Some(ref mut controller) = state.headers_sync_controller {
541 let better_tip_ts = better_tip_header.timestamp();
542 if let Some(is_timeout) = controller.is_timeout(better_tip_ts, now) {
543 if is_timeout {
544 eviction.push(*peer);
545 continue;
546 }
547 } else {
548 active_chain.send_getheaders_to_peer(
549 nc,
550 *peer,
551 better_tip_header.number_and_hash(),
552 );
553 }
554 }
555
556 if active_chain.is_initial_block_download() {
563 continue;
564 }
565 if state.peer_flags.is_outbound {
566 let best_known_header = state.best_known_header.as_ref();
567 let (tip_header, local_total_difficulty) = {
568 (
569 active_chain.tip_header().to_owned(),
570 active_chain.total_difficulty().to_owned(),
571 )
572 };
573 if best_known_header
574 .map(|header_index| header_index.total_difficulty().clone())
575 .unwrap_or_default()
576 >= local_total_difficulty
577 {
578 if state.chain_sync.timeout != 0 {
579 state.chain_sync.timeout = 0;
580 state.chain_sync.work_header = None;
581 state.chain_sync.total_difficulty = None;
582 state.chain_sync.sent_getheaders = false;
583 }
584 } else if state.chain_sync.timeout == 0
585 || (best_known_header.is_some()
586 && best_known_header
587 .map(|header_index| header_index.total_difficulty().clone())
588 >= state.chain_sync.total_difficulty)
589 {
590 state.chain_sync.timeout = now + CHAIN_SYNC_TIMEOUT;
595 state.chain_sync.work_header = Some(tip_header);
596 state.chain_sync.total_difficulty = Some(local_total_difficulty);
597 state.chain_sync.sent_getheaders = false;
598 } else if state.chain_sync.timeout > 0 && now > state.chain_sync.timeout {
599 if state.chain_sync.sent_getheaders {
603 if state.peer_flags.is_protect || state.peer_flags.is_whitelist {
604 if state.sync_started() {
605 self.shared().state().suspend_sync(state);
606 }
607 } else {
608 eviction.push(*peer);
609 }
610 } else {
611 state.chain_sync.sent_getheaders = true;
612 state.chain_sync.timeout = now + EVICTION_HEADERS_RESPONSE_TIME;
613 active_chain.send_getheaders_to_peer(
614 nc,
615 *peer,
616 state
617 .chain_sync
618 .work_header
619 .as_ref()
620 .expect("work_header be assigned")
621 .into(),
622 );
623 }
624 }
625 }
626 }
627 for peer in eviction {
628 info!("Timeout eviction peer={}", peer);
629 if let Err(err) = nc.disconnect(peer, "sync timeout eviction") {
630 debug!("synchronizer disconnect error: {:?}", err);
631 }
632 }
633 }
634
635 fn start_sync_headers(&self, nc: &Arc<dyn CKBProtocolContext + Sync>) {
636 let now = unix_time_as_millis();
637 let active_chain = self.shared.active_chain();
638 let ibd = active_chain.is_initial_block_download();
639 let peers: Vec<PeerIndex> = self
640 .peers()
641 .state
642 .iter()
643 .filter(|kv_pair| kv_pair.value().can_start_sync(now, ibd))
644 .map(|kv_pair| *kv_pair.key())
645 .collect();
646
647 if peers.is_empty() {
648 return;
649 }
650
651 let tip = self.better_tip_header();
652
653 for peer in peers {
654 if self
656 .shared()
657 .state()
658 .n_sync_started()
659 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
660 if ibd && x != 0 { None } else { Some(x + 1) }
661 })
662 .is_err()
663 {
664 break;
665 }
666 {
667 if let Some(mut peer_state) = self.peers().state.get_mut(&peer) {
668 peer_state.start_sync(HeadersSyncController::from_header(&tip));
669 }
670 }
671
672 debug!("Start sync peer={}", peer);
673 active_chain.send_getheaders_to_peer(nc, peer, tip.number_and_hash());
674 }
675 }
676
677 fn get_peers_to_fetch(
678 &self,
679 ibd: IBDState,
680 disconnect_list: &HashSet<PeerIndex>,
681 ) -> Vec<PeerIndex> {
682 trace!("Poll find_blocks_to_fetch selecting peers");
683 let state = &self
684 .shared
685 .state()
686 .read_inflight_blocks()
687 .download_schedulers;
688 let mut peers: Vec<PeerIndex> = self
689 .peers()
690 .state
691 .iter()
692 .filter(|kv_pair| {
693 let (id, state) = kv_pair.pair();
694 if disconnect_list.contains(id) {
695 return false;
696 };
697 match ibd {
698 IBDState::In => {
699 state.peer_flags.is_outbound
700 || state.peer_flags.is_whitelist
701 || state.peer_flags.is_protect
702 }
703 IBDState::Out => state.started_or_tip_synced(),
704 }
705 })
706 .map(|kv_pair| *kv_pair.key())
707 .collect();
708 peers.sort_by_key(|id| {
709 ::std::cmp::Reverse(
710 state
711 .get(id)
712 .map_or(INIT_BLOCKS_IN_TRANSIT_PER_PEER, |d| d.task_count()),
713 )
714 });
715 peers
716 }
717
718 fn find_blocks_to_fetch(&mut self, nc: &Arc<dyn CKBProtocolContext + Sync>, ibd: IBDState) {
719 if self.chain.is_verifying_unverified_blocks_on_startup() {
720 trace!(
721 "skip find_blocks_to_fetch, ckb_chain is verifying unverified blocks on startup"
722 );
723 return;
724 }
725
726 if ckb_stop_handler::has_received_stop_signal() {
727 info!("received stop signal, stop find_blocks_to_fetch");
728 return;
729 }
730
731 let unverified_tip = self.shared.active_chain().unverified_tip_number();
732
733 let disconnect_list = {
734 let mut list = self
735 .shared()
736 .state()
737 .write_inflight_blocks()
738 .prune(unverified_tip);
739 if let IBDState::In = ibd {
740 list.extend(
743 self.shared
744 .state()
745 .peers()
746 .get_best_known_less_than_tip_and_unknown_empty(unverified_tip),
747 )
748 };
749 list
750 };
751
752 for peer in disconnect_list.iter() {
753 if self
759 .peers()
760 .get_flag(*peer)
761 .map(|flag| flag.is_whitelist)
762 .unwrap_or(false)
763 {
764 continue;
765 }
766 let nc = Arc::clone(nc);
767 let peer = *peer;
768 self.shared.shared().async_handle().spawn(async move {
769 let _status = nc.async_disconnect(peer, "sync disconnect").await;
770 });
771 }
772
773 match nc.p2p_control() {
776 Some(raw) => match self.fetch_channel {
777 Some(ref sender) => {
778 if !sender.is_full() {
779 let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
780 let _ignore = sender.try_send(FetchCMD {
781 peers,
782 ibd_state: ibd,
783 });
784 }
785 }
786 None => {
787 let p2p_control = raw.clone();
788 let (sender, recv) = channel::bounded(2);
789 let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
790 sender
791 .send(FetchCMD {
792 peers,
793 ibd_state: ibd,
794 })
795 .unwrap();
796 self.fetch_channel = Some(sender);
797 let thread = ::std::thread::Builder::new();
798 let number = self.shared.state().shared_best_header_ref().number();
799 const THREAD_NAME: &str = "BlockDownload";
800 let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
801 let blockdownload_jh = thread
802 .name(THREAD_NAME.into())
803 .spawn(move || {
804 let stop_signal = new_crossbeam_exit_rx();
805 BlockFetchCMD {
806 sync_shared,
807 p2p_control,
808 recv,
809 number,
810 can_start: CanStart::MinWorkNotReach,
811 start_timestamp: unix_time_as_millis(),
812 }
813 .run(stop_signal);
814 })
815 .expect("download thread can't start");
816 register_thread(THREAD_NAME, blockdownload_jh);
817 }
818 },
819 None => {
820 for peer in self.get_peers_to_fetch(ibd, &disconnect_list) {
821 if let Some(fetch) =
822 tokio::task::block_in_place(|| self.get_blocks_to_fetch(peer, ibd))
823 {
824 for item in fetch {
825 self.send_getblocks(item, nc, peer);
826 }
827 }
828 }
829 }
830 }
831 }
832
833 fn send_getblocks(
834 &self,
835 v_fetch: Vec<packed::Byte32>,
836 nc: &Arc<dyn CKBProtocolContext + Sync>,
837 peer: PeerIndex,
838 ) {
839 let content = packed::GetBlocks::new_builder()
840 .block_hashes(v_fetch.clone())
841 .build();
842 let message = packed::SyncMessage::new_builder().set(content).build();
843
844 debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
845 let nc = Arc::clone(nc);
846 self.shared.shared().async_handle().spawn(async move {
847 let _status = async_send_message_to(&nc, peer, &message).await;
848 });
849 }
850}
851
852#[async_trait]
853impl CKBProtocolHandler for Synchronizer {
854 async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
855 nc.set_notify(SYNC_NOTIFY_INTERVAL, SEND_GET_HEADERS_TOKEN)
857 .await
858 .expect("set_notify at init is ok");
859 nc.set_notify(SYNC_NOTIFY_INTERVAL, TIMEOUT_EVICTION_TOKEN)
860 .await
861 .expect("set_notify at init is ok");
862 nc.set_notify(IBD_BLOCK_FETCH_INTERVAL, IBD_BLOCK_FETCH_TOKEN)
863 .await
864 .expect("set_notify at init is ok");
865 nc.set_notify(NOT_IBD_BLOCK_FETCH_INTERVAL, NOT_IBD_BLOCK_FETCH_TOKEN)
866 .await
867 .expect("set_notify at init is ok");
868 nc.set_notify(Duration::from_secs(2), NO_PEER_CHECK_TOKEN)
869 .await
870 .expect("set_notify at init is ok");
871 }
872
873 async fn received(
874 &mut self,
875 nc: Arc<dyn CKBProtocolContext + Sync>,
876 peer_index: PeerIndex,
877 data: Bytes,
878 ) {
879 let msg = match packed::SyncMessageReader::from_compatible_slice(&data) {
880 Ok(msg) => {
881 let item = msg.to_enum();
882 if let packed::SyncMessageUnionReader::SendBlock(ref reader) = item {
883 if reader.has_extra_fields() || reader.block().count_extra_fields() > 1 {
884 info!(
885 "A malformed message from peer {}: \
886 excessive fields detected in SendBlock",
887 peer_index
888 );
889 nc.ban_peer(
890 peer_index,
891 BAD_MESSAGE_BAN_TIME,
892 String::from(
893 "send us a malformed message: \
894 too many fields in SendBlock",
895 ),
896 );
897 return;
898 } else {
899 item
900 }
901 } else {
902 match packed::SyncMessageReader::from_slice(&data) {
903 Ok(msg) => msg.to_enum(),
904 _ => {
905 info!(
906 "A malformed message from peer {}: \
907 excessive fields",
908 peer_index
909 );
910 nc.ban_peer(
911 peer_index,
912 BAD_MESSAGE_BAN_TIME,
913 String::from(
914 "send us a malformed message: \
915 too many fields",
916 ),
917 );
918 return;
919 }
920 }
921 }
922 }
923 _ => {
924 info!("A malformed message from peer {}", peer_index);
925 nc.ban_peer(
926 peer_index,
927 BAD_MESSAGE_BAN_TIME,
928 String::from("send us a malformed message"),
929 );
930 return;
931 }
932 };
933
934 debug!("Received msg {} from {}", msg.item_name(), peer_index);
935 #[cfg(feature = "with_sentry")]
936 {
937 let sentry_hub = sentry::Hub::current();
938 let _scope_guard = sentry_hub.push_scope();
939 sentry_hub.configure_scope(|scope| {
940 scope.set_tag("p2p.protocol", "synchronizer");
941 scope.set_tag("p2p.message", msg.item_name());
942 });
943 }
944
945 let start_time = Instant::now();
946 self.process(nc, peer_index, msg).await;
947 debug!(
948 "Process message={}, peer={}, cost={:?}",
949 msg.item_name(),
950 peer_index,
951 Instant::now().saturating_duration_since(start_time),
952 );
953 }
954
955 async fn connected(
956 &mut self,
957 nc: Arc<dyn CKBProtocolContext + Sync>,
958 peer_index: PeerIndex,
959 _version: &str,
960 ) {
961 info!("SyncProtocol.connected peer={}", peer_index);
962 self.on_connected(nc.as_ref(), peer_index);
963 }
964
965 async fn disconnected(
966 &mut self,
967 _nc: Arc<dyn CKBProtocolContext + Sync>,
968 peer_index: PeerIndex,
969 ) {
970 let sync_state = self.shared().state();
971 sync_state.disconnected(peer_index);
972 info!("SyncProtocol.disconnected peer={}", peer_index);
973 }
974
975 async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
976 if !self.peers().state.is_empty() {
977 let start_time = Instant::now();
978 trace!("Start notify token={}", token);
979 match token {
980 SEND_GET_HEADERS_TOKEN => {
981 self.start_sync_headers(&nc);
982 }
983 IBD_BLOCK_FETCH_TOKEN => {
984 if self.shared.active_chain().is_initial_block_download() {
985 self.find_blocks_to_fetch(&nc, IBDState::In);
986 } else {
987 {
988 self.shared.state().write_inflight_blocks().adjustment = false;
989 }
990 self.shared.state().peers().clear_unknown_list();
991 if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).await.is_err() {
992 trace!("Ibd block fetch token removal failed");
993 }
994 }
995 }
996 NOT_IBD_BLOCK_FETCH_TOKEN => {
997 if !self.shared.active_chain().is_initial_block_download() {
998 self.find_blocks_to_fetch(&nc, IBDState::Out);
999 }
1000 }
1001 TIMEOUT_EVICTION_TOKEN => {
1002 self.eviction(&nc);
1003 }
1004 _ => {}
1006 }
1007
1008 trace!(
1009 "Finished notify token={} cost={:?}",
1010 token,
1011 Instant::now().saturating_duration_since(start_time)
1012 );
1013 } else if token == NO_PEER_CHECK_TOKEN {
1014 debug!("No peers connected");
1015 }
1016 }
1017}