1mod block_fetcher;
10mod block_process;
11mod get_blocks_process;
12mod get_headers_process;
13mod headers_process;
14mod in_ibd_process;
15
16pub(crate) use self::block_fetcher::BlockFetcher;
17pub(crate) use self::block_process::BlockProcess;
18pub(crate) use self::get_blocks_process::GetBlocksProcess;
19pub(crate) use self::get_headers_process::GetHeadersProcess;
20pub(crate) use self::headers_process::HeadersProcess;
21pub(crate) use self::in_ibd_process::InIBDProcess;
22
23use crate::types::{post_sync_process, HeadersSyncController, IBDState, Peers, SyncShared};
24use crate::utils::{metric_ckb_message_bytes, send_message_to, MetricDirection};
25use crate::{Status, StatusCode};
26use ckb_shared::block_status::BlockStatus;
27
28use ckb_chain::{ChainController, RemoteBlock};
29use ckb_channel as channel;
30use ckb_channel::{select, Receiver};
31use ckb_constant::sync::{
32 BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME,
33 INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE,
34};
35use ckb_logger::{debug, error, info, trace, warn};
36use ckb_metrics::HistogramTimer;
37use ckb_network::{
38 async_trait, bytes::Bytes, tokio, CKBProtocolContext, CKBProtocolHandler, PeerIndex,
39 ServiceControl, SupportProtocols,
40};
41use ckb_shared::types::HeaderIndexView;
42use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
43use ckb_systemtime::unix_time_as_millis;
44
45#[cfg(test)]
46use ckb_types::core;
47use ckb_types::{
48 core::BlockNumber,
49 packed::{self, Byte32},
50 prelude::*,
51};
52use std::{
53 collections::HashSet,
54 sync::{atomic::Ordering, Arc},
55 time::{Duration, Instant},
56};
57
58pub const SEND_GET_HEADERS_TOKEN: u64 = 0;
59pub const IBD_BLOCK_FETCH_TOKEN: u64 = 1;
60pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
61pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
62pub const NO_PEER_CHECK_TOKEN: u64 = 255;
63
64const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1);
65const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
66const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);
67
68#[derive(Copy, Clone)]
69enum CanStart {
70 FetchToTarget(BlockNumber),
71 Ready,
72 MinWorkNotReach,
73 AssumeValidNotFound,
74}
75
76struct FetchCMD {
77 peers: Vec<PeerIndex>,
78 ibd_state: IBDState,
79}
80
81struct BlockFetchCMD {
82 sync_shared: Arc<SyncShared>,
83 p2p_control: ServiceControl,
84 recv: channel::Receiver<FetchCMD>,
85 can_start: CanStart,
86 number: BlockNumber,
87 start_timestamp: u64,
88}
89
90impl BlockFetchCMD {
91 fn process_fetch_cmd(&mut self, cmd: FetchCMD) {
92 let FetchCMD { peers, ibd_state }: FetchCMD = cmd;
93
94 let fetch_blocks_fn = |cmd: &mut BlockFetchCMD, assume_target: BlockNumber| {
95 for peer in peers {
96 if ckb_stop_handler::has_received_stop_signal() {
97 return;
98 }
99
100 let mut fetch_end: BlockNumber = u64::MAX;
101 if assume_target != 0 {
102 fetch_end = assume_target
103 }
104
105 if let Some(fetch) =
106 BlockFetcher::new(Arc::clone(&cmd.sync_shared), peer, ibd_state)
107 .fetch(fetch_end)
108 {
109 for item in fetch {
110 if ckb_stop_handler::has_received_stop_signal() {
111 return;
112 }
113 BlockFetchCMD::send_getblocks(item, &cmd.p2p_control, peer);
114 }
115 }
116 }
117 };
118
119 match self.can_start() {
120 CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target),
121 CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX),
122 CanStart::MinWorkNotReach => {
123 let best_known = self.sync_shared.state().shared_best_header_ref();
124 let number = best_known.number();
125 if number != self.number && (number - self.number) % 10000 == 0 {
126 self.number = number;
127 info!(
128 "The current best known header number: {}, total difficulty: {:#x}. \
129 Block download minimum requirements: header number: 500_000, total difficulty: {:#x}.",
130 number,
131 best_known.total_difficulty(),
132 self.sync_shared.state().min_chain_work()
133 );
134 }
135 }
136 CanStart::AssumeValidNotFound => {
137 let state = self.sync_shared.state();
138 let shared = self.sync_shared.shared();
139 let best_known = state.shared_best_header_ref();
140 let number = best_known.number();
141 let assume_valid_target: Byte32 = shared
142 .assume_valid_targets()
143 .as_ref()
144 .and_then(|targets| targets.first())
145 .map(Pack::pack)
146 .expect("assume valid target must exist");
147
148 if number != self.number && (number - self.number) % 10000 == 0 {
149 self.number = number;
150 let remaining_headers_sync_log = self.reaming_headers_sync_log();
151
152 info!(
153 "best known header {}-{}, \
154 CKB is syncing to latest Header to find the assume valid target: {}. \
155 Please wait. {}",
156 number,
157 best_known.hash(),
158 assume_valid_target,
159 remaining_headers_sync_log
160 );
161 }
162 }
163 }
164 }
165
166 fn reaming_headers_sync_log(&self) -> String {
167 if let Some(remaining_headers_needed) = self.calc_time_need_to_reach_latest_tip_header() {
168 format!(
169 "Need {} minutes to sync to the latest Header.",
170 remaining_headers_needed.as_secs() / 60
171 )
172 } else {
173 "".to_string()
174 }
175 }
176
177 fn calc_time_need_to_reach_latest_tip_header(&self) -> Option<Duration> {
189 let genesis_timestamp = self
190 .sync_shared
191 .consensus()
192 .genesis_block()
193 .header()
194 .timestamp();
195 let shared_best_timestamp = self.sync_shared.state().shared_best_header().timestamp();
196
197 let ckb_process_start_timestamp = self.start_timestamp;
198
199 let now_timestamp = unix_time_as_millis();
200
201 let ckb_chain_age = now_timestamp.checked_sub(genesis_timestamp)?;
202
203 let ckb_process_age = now_timestamp.checked_sub(ckb_process_start_timestamp)?;
204
205 let has_synced_headers_age = shared_best_timestamp.checked_sub(genesis_timestamp)?;
206
207 let ckb_sync_header_speed = has_synced_headers_age.checked_div(ckb_process_age)?;
208
209 let sync_all_headers_timecost = ckb_chain_age.checked_div(ckb_sync_header_speed)?;
210
211 let sync_remaining_headers_needed =
212 sync_all_headers_timecost.checked_sub(ckb_process_age)?;
213
214 Some(Duration::from_millis(sync_remaining_headers_needed))
215 }
216
217 fn run(&mut self, stop_signal: Receiver<()>) {
218 loop {
219 select! {
220 recv(self.recv) -> msg => {
221 if let Ok(cmd) = msg {
222 self.process_fetch_cmd(cmd)
223 }
224 }
225 recv(stop_signal) -> _ => {
226 info!("BlockDownload received exit signal, exit now");
227 return;
228 }
229 }
230 }
231 }
232
233 fn can_start(&mut self) -> CanStart {
234 if let CanStart::Ready = self.can_start {
235 return self.can_start;
236 }
237
238 let shared = self.sync_shared.shared();
239 let state = self.sync_shared.state();
240
241 let min_work_reach = |flag: &mut CanStart| {
242 if state.min_chain_work_ready() {
243 *flag = CanStart::AssumeValidNotFound;
244 }
245 };
246
247 let assume_valid_target_find = |flag: &mut CanStart| {
248 let mut assume_valid_targets = shared.assume_valid_targets();
249 if let Some(ref targets) = *assume_valid_targets {
250 if targets.is_empty() {
251 assume_valid_targets.take();
252 *flag = CanStart::Ready;
253 return;
254 }
255 let first_target = targets
256 .first()
257 .expect("has checked targets is not empty, assume valid target must exist");
258 match shared.header_map().get(&first_target.pack()) {
259 Some(header) => {
260 if matches!(*flag, CanStart::FetchToTarget(fetch_target) if fetch_target == header.number())
261 {
262 } else {
264 *flag = CanStart::FetchToTarget(header.number());
265 info!("assume valid target found in header_map; CKB will start fetch blocks to {:?} now", header.number_and_hash());
266 }
267 if unix_time_as_millis().saturating_sub(header.timestamp()) < MAX_TIP_AGE {
269 assume_valid_targets.take();
270 warn!("the duration gap between 'assume valid target' and 'now' is less than 24h; CKB will ignore the specified assume valid target and do full verification from now on");
271 }
272 }
273 None => {
274 if unix_time_as_millis()
276 .saturating_sub(state.shared_best_header_ref().timestamp())
277 < MAX_TIP_AGE
278 {
279 warn!("the duration gap between 'shared_best_header' and 'now' is less than 24h, but CKB haven't found the assume valid target in header_map; CKB will ignore the specified assume valid target and do full verification from now on");
280 *flag = CanStart::Ready;
281 assume_valid_targets.take();
282 }
283 }
284 }
285 } else {
286 *flag = CanStart::Ready;
287 }
288 };
289
290 match self.can_start {
291 CanStart::FetchToTarget(_) => {
292 assume_valid_target_find(&mut self.can_start);
293 self.can_start
294 }
295 CanStart::Ready => self.can_start,
296 CanStart::MinWorkNotReach => {
297 min_work_reach(&mut self.can_start);
298 if let CanStart::AssumeValidNotFound = self.can_start {
299 assume_valid_target_find(&mut self.can_start);
300 }
301 self.can_start
302 }
303 CanStart::AssumeValidNotFound => {
304 assume_valid_target_find(&mut self.can_start);
305 self.can_start
306 }
307 }
308 }
309
310 fn send_getblocks(v_fetch: Vec<packed::Byte32>, nc: &ServiceControl, peer: PeerIndex) {
311 let content = packed::GetBlocks::new_builder()
312 .block_hashes(v_fetch.clone().pack())
313 .build();
314 let message = packed::SyncMessage::new_builder().set(content).build();
315
316 debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
317 if let Err(err) = nc.send_message_to(
318 peer,
319 SupportProtocols::Sync.protocol_id(),
320 message.as_bytes(),
321 ) {
322 debug!("synchronizer sending GetBlocks error: {:?}", err);
323 }
324 }
325}
326
327pub struct Synchronizer {
329 pub(crate) chain: ChainController,
330 pub shared: Arc<SyncShared>,
332 fetch_channel: Option<channel::Sender<FetchCMD>>,
333}
334
335impl Synchronizer {
336 pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
340 Synchronizer {
341 chain,
342 shared,
343 fetch_channel: None,
344 }
345 }
346
347 pub fn shared(&self) -> &Arc<SyncShared> {
349 &self.shared
350 }
351
352 fn try_process(
353 &self,
354 nc: Arc<dyn CKBProtocolContext + Sync>,
355 peer: PeerIndex,
356 message: packed::SyncMessageUnionReader<'_>,
357 ) -> Status {
358 let _trace_timecost: Option<HistogramTimer> = {
359 ckb_metrics::handle().map(|handle| {
360 handle
361 .ckb_sync_msg_process_duration
362 .with_label_values(&[message.item_name()])
363 .start_timer()
364 })
365 };
366
367 match message {
368 packed::SyncMessageUnionReader::GetHeaders(reader) => {
369 GetHeadersProcess::new(reader, self, peer, nc.as_ref()).execute()
370 }
371 packed::SyncMessageUnionReader::SendHeaders(reader) => {
372 HeadersProcess::new(reader, self, peer, nc.as_ref()).execute()
373 }
374 packed::SyncMessageUnionReader::GetBlocks(reader) => {
375 GetBlocksProcess::new(reader, self, peer, nc.as_ref()).execute()
376 }
377 packed::SyncMessageUnionReader::SendBlock(reader) => {
378 if reader.check_data() {
379 BlockProcess::new(reader, self, peer, nc).execute()
380 } else {
381 StatusCode::ProtocolMessageIsMalformed.with_context("SendBlock is invalid")
382 }
383 }
384 packed::SyncMessageUnionReader::InIBD(_) => {
385 InIBDProcess::new(self, peer, nc.as_ref()).execute()
386 }
387 }
388 }
389
390 fn process(
391 &self,
392 nc: Arc<dyn CKBProtocolContext + Sync>,
393 peer: PeerIndex,
394 message: packed::SyncMessageUnionReader<'_>,
395 ) {
396 let item_name = message.item_name();
397 let item_bytes = message.as_slice().len() as u64;
398 let status = self.try_process(Arc::clone(&nc), peer, message);
399
400 metric_ckb_message_bytes(
401 MetricDirection::In,
402 &SupportProtocols::Sync.name(),
403 item_name,
404 Some(status.code()),
405 item_bytes,
406 );
407
408 post_sync_process(nc.as_ref(), peer, item_name, status);
409 }
410
411 pub fn peers(&self) -> &Peers {
413 self.shared().state().peers()
414 }
415
416 fn better_tip_header(&self) -> HeaderIndexView {
417 let (header, total_difficulty) = {
418 let active_chain = self.shared.active_chain();
419 (
420 active_chain.tip_header(),
421 active_chain.total_difficulty().to_owned(),
422 )
423 };
424 let best_known = self.shared.state().shared_best_header();
425 if total_difficulty > *best_known.total_difficulty() {
427 (header, total_difficulty).into()
428 } else {
429 best_known
430 }
431 }
432
433 pub fn asynchronous_process_remote_block(&self, remote_block: RemoteBlock) {
436 let block_hash = remote_block.block.hash();
437 let status = self.shared.active_chain().get_block_status(&block_hash);
438 if status.contains(BlockStatus::BLOCK_STORED) {
441 error!("Block {} already stored", block_hash);
442 } else if status.contains(BlockStatus::HEADER_VALID) {
443 self.shared.accept_remote_block(&self.chain, remote_block);
444 } else {
445 debug!(
446 "Synchronizer process_new_block unexpected status {:?} {}",
447 status, block_hash,
448 );
449 }
451 }
452
453 #[cfg(test)]
454 pub fn blocking_process_new_block(
455 &self,
456 block: core::BlockView,
457 _peer_id: PeerIndex,
458 ) -> Result<bool, ckb_error::Error> {
459 let block_hash = block.hash();
460 let status = self.shared.active_chain().get_block_status(&block_hash);
461 if status.contains(BlockStatus::BLOCK_STORED) {
464 error!("block {} already stored", block_hash);
465 Ok(false)
466 } else if status.contains(BlockStatus::HEADER_VALID) {
467 self.chain.blocking_process_block(Arc::new(block))
468 } else {
469 debug!(
470 "Synchronizer process_new_block unexpected status {:?} {}",
471 status, block_hash,
472 );
473 Ok(false)
475 }
476 }
477
478 pub fn get_blocks_to_fetch(
480 &self,
481 peer: PeerIndex,
482 ibd: IBDState,
483 ) -> Option<Vec<Vec<packed::Byte32>>> {
484 BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch(BlockNumber::MAX)
485 }
486
487 pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
488 let pid = SupportProtocols::Sync.protocol_id();
489 let (is_outbound, is_whitelist, is_2023edition) = nc
490 .get_peer(peer)
491 .map(|peer| {
492 (
493 peer.is_outbound(),
494 peer.is_whitelist,
495 peer.protocols.get(&pid).map(|v| v == "3").unwrap_or(false),
496 )
497 })
498 .unwrap_or((false, false, false));
499
500 self.peers()
501 .sync_connected(peer, is_outbound, is_whitelist, is_2023edition);
502 }
503
504 pub fn eviction(&self, nc: &dyn CKBProtocolContext) {
514 let active_chain = self.shared.active_chain();
515 let mut eviction = Vec::new();
516 let better_tip_header = self.better_tip_header();
517 for mut kv_pair in self.peers().state.iter_mut() {
518 let (peer, state) = kv_pair.pair_mut();
519 let now = unix_time_as_millis();
520
521 if let Some(ref mut controller) = state.headers_sync_controller {
522 let better_tip_ts = better_tip_header.timestamp();
523 if let Some(is_timeout) = controller.is_timeout(better_tip_ts, now) {
524 if is_timeout {
525 eviction.push(*peer);
526 continue;
527 }
528 } else {
529 active_chain.send_getheaders_to_peer(
530 nc,
531 *peer,
532 better_tip_header.number_and_hash(),
533 );
534 }
535 }
536
537 if active_chain.is_initial_block_download() {
544 continue;
545 }
546 if state.peer_flags.is_outbound {
547 let best_known_header = state.best_known_header.as_ref();
548 let (tip_header, local_total_difficulty) = {
549 (
550 active_chain.tip_header().to_owned(),
551 active_chain.total_difficulty().to_owned(),
552 )
553 };
554 if best_known_header
555 .map(|header_index| header_index.total_difficulty().clone())
556 .unwrap_or_default()
557 >= local_total_difficulty
558 {
559 if state.chain_sync.timeout != 0 {
560 state.chain_sync.timeout = 0;
561 state.chain_sync.work_header = None;
562 state.chain_sync.total_difficulty = None;
563 state.chain_sync.sent_getheaders = false;
564 }
565 } else if state.chain_sync.timeout == 0
566 || (best_known_header.is_some()
567 && best_known_header
568 .map(|header_index| header_index.total_difficulty().clone())
569 >= state.chain_sync.total_difficulty)
570 {
571 state.chain_sync.timeout = now + CHAIN_SYNC_TIMEOUT;
576 state.chain_sync.work_header = Some(tip_header);
577 state.chain_sync.total_difficulty = Some(local_total_difficulty);
578 state.chain_sync.sent_getheaders = false;
579 } else if state.chain_sync.timeout > 0 && now > state.chain_sync.timeout {
580 if state.chain_sync.sent_getheaders {
584 if state.peer_flags.is_protect || state.peer_flags.is_whitelist {
585 if state.sync_started() {
586 self.shared().state().suspend_sync(state);
587 }
588 } else {
589 eviction.push(*peer);
590 }
591 } else {
592 state.chain_sync.sent_getheaders = true;
593 state.chain_sync.timeout = now + EVICTION_HEADERS_RESPONSE_TIME;
594 active_chain.send_getheaders_to_peer(
595 nc,
596 *peer,
597 state
598 .chain_sync
599 .work_header
600 .as_ref()
601 .expect("work_header be assigned")
602 .into(),
603 );
604 }
605 }
606 }
607 }
608 for peer in eviction {
609 info!("Timeout eviction peer={}", peer);
610 if let Err(err) = nc.disconnect(peer, "sync timeout eviction") {
611 debug!("synchronizer disconnect error: {:?}", err);
612 }
613 }
614 }
615
616 fn start_sync_headers(&self, nc: &dyn CKBProtocolContext) {
617 let now = unix_time_as_millis();
618 let active_chain = self.shared.active_chain();
619 let ibd = active_chain.is_initial_block_download();
620 let peers: Vec<PeerIndex> = self
621 .peers()
622 .state
623 .iter()
624 .filter(|kv_pair| kv_pair.value().can_start_sync(now, ibd))
625 .map(|kv_pair| *kv_pair.key())
626 .collect();
627
628 if peers.is_empty() {
629 return;
630 }
631
632 let tip = self.better_tip_header();
633
634 for peer in peers {
635 if self
637 .shared()
638 .state()
639 .n_sync_started()
640 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
641 if ibd && x != 0 {
642 None
643 } else {
644 Some(x + 1)
645 }
646 })
647 .is_err()
648 {
649 break;
650 }
651 {
652 if let Some(mut peer_state) = self.peers().state.get_mut(&peer) {
653 peer_state.start_sync(HeadersSyncController::from_header(&tip));
654 }
655 }
656
657 debug!("Start sync peer={}", peer);
658 active_chain.send_getheaders_to_peer(nc, peer, tip.number_and_hash());
659 }
660 }
661
662 fn get_peers_to_fetch(
663 &self,
664 ibd: IBDState,
665 disconnect_list: &HashSet<PeerIndex>,
666 ) -> Vec<PeerIndex> {
667 trace!("Poll find_blocks_to_fetch selecting peers");
668 let state = &self
669 .shared
670 .state()
671 .read_inflight_blocks()
672 .download_schedulers;
673 let mut peers: Vec<PeerIndex> = self
674 .peers()
675 .state
676 .iter()
677 .filter(|kv_pair| {
678 let (id, state) = kv_pair.pair();
679 if disconnect_list.contains(id) {
680 return false;
681 };
682 match ibd {
683 IBDState::In => {
684 state.peer_flags.is_outbound
685 || state.peer_flags.is_whitelist
686 || state.peer_flags.is_protect
687 }
688 IBDState::Out => state.started_or_tip_synced(),
689 }
690 })
691 .map(|kv_pair| *kv_pair.key())
692 .collect();
693 peers.sort_by_key(|id| {
694 ::std::cmp::Reverse(
695 state
696 .get(id)
697 .map_or(INIT_BLOCKS_IN_TRANSIT_PER_PEER, |d| d.task_count()),
698 )
699 });
700 peers
701 }
702
703 fn find_blocks_to_fetch(&mut self, nc: &dyn CKBProtocolContext, ibd: IBDState) {
704 if self.chain.is_verifying_unverified_blocks_on_startup() {
705 trace!(
706 "skip find_blocks_to_fetch, ckb_chain is verifying unverified blocks on startup"
707 );
708 return;
709 }
710
711 if ckb_stop_handler::has_received_stop_signal() {
712 info!("received stop signal, stop find_blocks_to_fetch");
713 return;
714 }
715
716 let unverified_tip = self.shared.active_chain().unverified_tip_number();
717
718 let disconnect_list = {
719 let mut list = self
720 .shared()
721 .state()
722 .write_inflight_blocks()
723 .prune(unverified_tip);
724 if let IBDState::In = ibd {
725 list.extend(
728 self.shared
729 .state()
730 .peers()
731 .get_best_known_less_than_tip_and_unknown_empty(unverified_tip),
732 )
733 };
734 list
735 };
736
737 for peer in disconnect_list.iter() {
738 if self
744 .peers()
745 .get_flag(*peer)
746 .map(|flag| flag.is_whitelist)
747 .unwrap_or(false)
748 {
749 continue;
750 }
751 if let Err(err) = nc.disconnect(*peer, "sync disconnect") {
752 debug!("synchronizer disconnect error: {:?}", err);
753 }
754 }
755
756 match nc.p2p_control() {
759 Some(raw) => match self.fetch_channel {
760 Some(ref sender) => {
761 if !sender.is_full() {
762 let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
763 let _ignore = sender.try_send(FetchCMD {
764 peers,
765 ibd_state: ibd,
766 });
767 }
768 }
769 None => {
770 let p2p_control = raw.clone();
771 let (sender, recv) = channel::bounded(2);
772 let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
773 sender
774 .send(FetchCMD {
775 peers,
776 ibd_state: ibd,
777 })
778 .unwrap();
779 self.fetch_channel = Some(sender);
780 let thread = ::std::thread::Builder::new();
781 let number = self.shared.state().shared_best_header_ref().number();
782 const THREAD_NAME: &str = "BlockDownload";
783 let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
784 let blockdownload_jh = thread
785 .name(THREAD_NAME.into())
786 .spawn(move || {
787 let stop_signal = new_crossbeam_exit_rx();
788 BlockFetchCMD {
789 sync_shared,
790 p2p_control,
791 recv,
792 number,
793 can_start: CanStart::MinWorkNotReach,
794 start_timestamp: unix_time_as_millis(),
795 }
796 .run(stop_signal);
797 })
798 .expect("download thread can't start");
799 register_thread(THREAD_NAME, blockdownload_jh);
800 }
801 },
802 None => {
803 for peer in self.get_peers_to_fetch(ibd, &disconnect_list) {
804 if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) {
805 for item in fetch {
806 self.send_getblocks(item, nc, peer);
807 }
808 }
809 }
810 }
811 }
812 }
813
814 fn send_getblocks(
815 &self,
816 v_fetch: Vec<packed::Byte32>,
817 nc: &dyn CKBProtocolContext,
818 peer: PeerIndex,
819 ) {
820 let content = packed::GetBlocks::new_builder()
821 .block_hashes(v_fetch.clone().pack())
822 .build();
823 let message = packed::SyncMessage::new_builder().set(content).build();
824
825 debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
826 let _status = send_message_to(nc, peer, &message);
827 }
828}
829
830#[async_trait]
831impl CKBProtocolHandler for Synchronizer {
832 async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
833 nc.set_notify(SYNC_NOTIFY_INTERVAL, SEND_GET_HEADERS_TOKEN)
835 .await
836 .expect("set_notify at init is ok");
837 nc.set_notify(SYNC_NOTIFY_INTERVAL, TIMEOUT_EVICTION_TOKEN)
838 .await
839 .expect("set_notify at init is ok");
840 nc.set_notify(IBD_BLOCK_FETCH_INTERVAL, IBD_BLOCK_FETCH_TOKEN)
841 .await
842 .expect("set_notify at init is ok");
843 nc.set_notify(NOT_IBD_BLOCK_FETCH_INTERVAL, NOT_IBD_BLOCK_FETCH_TOKEN)
844 .await
845 .expect("set_notify at init is ok");
846 nc.set_notify(Duration::from_secs(2), NO_PEER_CHECK_TOKEN)
847 .await
848 .expect("set_notify at init is ok");
849 }
850
851 async fn received(
852 &mut self,
853 nc: Arc<dyn CKBProtocolContext + Sync>,
854 peer_index: PeerIndex,
855 data: Bytes,
856 ) {
857 let msg = match packed::SyncMessageReader::from_compatible_slice(&data) {
858 Ok(msg) => {
859 let item = msg.to_enum();
860 if let packed::SyncMessageUnionReader::SendBlock(ref reader) = item {
861 if reader.has_extra_fields() || reader.block().count_extra_fields() > 1 {
862 info!(
863 "A malformed message from peer {}: \
864 excessive fields detected in SendBlock",
865 peer_index
866 );
867 nc.ban_peer(
868 peer_index,
869 BAD_MESSAGE_BAN_TIME,
870 String::from(
871 "send us a malformed message: \
872 too many fields in SendBlock",
873 ),
874 );
875 return;
876 } else {
877 item
878 }
879 } else {
880 match packed::SyncMessageReader::from_slice(&data) {
881 Ok(msg) => msg.to_enum(),
882 _ => {
883 info!(
884 "A malformed message from peer {}: \
885 excessive fields",
886 peer_index
887 );
888 nc.ban_peer(
889 peer_index,
890 BAD_MESSAGE_BAN_TIME,
891 String::from(
892 "send us a malformed message: \
893 too many fields",
894 ),
895 );
896 return;
897 }
898 }
899 }
900 }
901 _ => {
902 info!("A malformed message from peer {}", peer_index);
903 nc.ban_peer(
904 peer_index,
905 BAD_MESSAGE_BAN_TIME,
906 String::from("send us a malformed message"),
907 );
908 return;
909 }
910 };
911
912 debug!("Received msg {} from {}", msg.item_name(), peer_index);
913 #[cfg(feature = "with_sentry")]
914 {
915 let sentry_hub = sentry::Hub::current();
916 let _scope_guard = sentry_hub.push_scope();
917 sentry_hub.configure_scope(|scope| {
918 scope.set_tag("p2p.protocol", "synchronizer");
919 scope.set_tag("p2p.message", msg.item_name());
920 });
921 }
922
923 let start_time = Instant::now();
924 tokio::task::block_in_place(|| self.process(nc, peer_index, msg));
925 debug!(
926 "Process message={}, peer={}, cost={:?}",
927 msg.item_name(),
928 peer_index,
929 Instant::now().saturating_duration_since(start_time),
930 );
931 }
932
933 async fn connected(
934 &mut self,
935 nc: Arc<dyn CKBProtocolContext + Sync>,
936 peer_index: PeerIndex,
937 _version: &str,
938 ) {
939 info!("SyncProtocol.connected peer={}", peer_index);
940 self.on_connected(nc.as_ref(), peer_index);
941 }
942
943 async fn disconnected(
944 &mut self,
945 _nc: Arc<dyn CKBProtocolContext + Sync>,
946 peer_index: PeerIndex,
947 ) {
948 let sync_state = self.shared().state();
949 sync_state.disconnected(peer_index);
950 info!("SyncProtocol.disconnected peer={}", peer_index);
951 }
952
953 async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
954 if !self.peers().state.is_empty() {
955 let start_time = Instant::now();
956 trace!("Start notify token={}", token);
957 match token {
958 SEND_GET_HEADERS_TOKEN => {
959 self.start_sync_headers(nc.as_ref());
960 }
961 IBD_BLOCK_FETCH_TOKEN => {
962 if self.shared.active_chain().is_initial_block_download() {
963 self.find_blocks_to_fetch(nc.as_ref(), IBDState::In);
964 } else {
965 {
966 self.shared.state().write_inflight_blocks().adjustment = false;
967 }
968 self.shared.state().peers().clear_unknown_list();
969 if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).await.is_err() {
970 trace!("Ibd block fetch token removal failed");
971 }
972 }
973 }
974 NOT_IBD_BLOCK_FETCH_TOKEN => {
975 if !self.shared.active_chain().is_initial_block_download() {
976 self.find_blocks_to_fetch(nc.as_ref(), IBDState::Out);
977 }
978 }
979 TIMEOUT_EVICTION_TOKEN => {
980 self.eviction(nc.as_ref());
981 }
982 _ => {}
984 }
985
986 trace!(
987 "Finished notify token={} cost={:?}",
988 token,
989 Instant::now().saturating_duration_since(start_time)
990 );
991 } else if token == NO_PEER_CHECK_TOKEN {
992 debug!("No peers connected");
993 }
994 }
995}