1mod block_fetcher;
10mod block_process;
11mod get_blocks_process;
12mod get_headers_process;
13mod headers_process;
14mod in_ibd_process;
15
16pub(crate) use self::block_fetcher::BlockFetcher;
17pub(crate) use self::block_process::BlockProcess;
18pub(crate) use self::get_blocks_process::GetBlocksProcess;
19pub(crate) use self::get_headers_process::GetHeadersProcess;
20pub(crate) use self::headers_process::HeadersProcess;
21pub(crate) use self::in_ibd_process::InIBDProcess;
22
23use crate::types::{HeadersSyncController, IBDState, Peers, SyncShared, post_sync_process};
24use crate::utils::{MetricDirection, metric_ckb_message_bytes, send_message_to};
25use crate::{Status, StatusCode};
26use ckb_shared::block_status::BlockStatus;
27
28use ckb_chain::{ChainController, RemoteBlock};
29use ckb_channel as channel;
30use ckb_channel::{Receiver, select};
31use ckb_constant::sync::{
32 BAD_MESSAGE_BAN_TIME, CHAIN_SYNC_TIMEOUT, EVICTION_HEADERS_RESPONSE_TIME,
33 INIT_BLOCKS_IN_TRANSIT_PER_PEER, MAX_TIP_AGE,
34};
35use ckb_logger::{debug, error, info, trace, warn};
36use ckb_metrics::HistogramTimer;
37use ckb_network::{
38 CKBProtocolContext, CKBProtocolHandler, PeerIndex, ServiceControl, SupportProtocols,
39 async_trait, bytes::Bytes, tokio,
40};
41use ckb_shared::types::HeaderIndexView;
42use ckb_stop_handler::{new_crossbeam_exit_rx, register_thread};
43use ckb_systemtime::unix_time_as_millis;
44
45#[cfg(test)]
46use ckb_types::core;
47use ckb_types::{
48 core::BlockNumber,
49 packed::{self, Byte32},
50 prelude::*,
51};
52use std::{
53 collections::HashSet,
54 sync::{Arc, atomic::Ordering},
55 time::{Duration, Instant},
56};
57
58pub const SEND_GET_HEADERS_TOKEN: u64 = 0;
59pub const IBD_BLOCK_FETCH_TOKEN: u64 = 1;
60pub const NOT_IBD_BLOCK_FETCH_TOKEN: u64 = 2;
61pub const TIMEOUT_EVICTION_TOKEN: u64 = 3;
62pub const NO_PEER_CHECK_TOKEN: u64 = 255;
63
64const SYNC_NOTIFY_INTERVAL: Duration = Duration::from_secs(1);
65const IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(40);
66const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);
67
68#[derive(Copy, Clone)]
69enum CanStart {
70 FetchToTarget(BlockNumber),
71 Ready,
72 MinWorkNotReach,
73 AssumeValidNotFound,
74}
75
76struct FetchCMD {
77 peers: Vec<PeerIndex>,
78 ibd_state: IBDState,
79}
80
81struct BlockFetchCMD {
82 sync_shared: Arc<SyncShared>,
83 p2p_control: ServiceControl,
84 recv: channel::Receiver<FetchCMD>,
85 can_start: CanStart,
86 number: BlockNumber,
87 start_timestamp: u64,
88}
89
90impl BlockFetchCMD {
91 fn process_fetch_cmd(&mut self, cmd: FetchCMD) {
92 let FetchCMD { peers, ibd_state }: FetchCMD = cmd;
93
94 let fetch_blocks_fn = |cmd: &mut BlockFetchCMD, assume_target: BlockNumber| {
95 for peer in peers {
96 if ckb_stop_handler::has_received_stop_signal() {
97 return;
98 }
99
100 let mut fetch_end: BlockNumber = u64::MAX;
101 if assume_target != 0 {
102 fetch_end = assume_target
103 }
104
105 if let Some(fetch) =
106 BlockFetcher::new(Arc::clone(&cmd.sync_shared), peer, ibd_state)
107 .fetch(fetch_end)
108 {
109 for item in fetch {
110 if ckb_stop_handler::has_received_stop_signal() {
111 return;
112 }
113 BlockFetchCMD::send_getblocks(item, &cmd.p2p_control, peer);
114 }
115 }
116 }
117 };
118
119 match self.can_start() {
120 CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target),
121 CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX),
122 CanStart::MinWorkNotReach => {
123 let best_known = self.sync_shared.state().shared_best_header_ref();
124 let number = best_known.number();
125 if number != self.number && (number - self.number) % 10000 == 0 {
126 self.number = number;
127 info!(
128 "The current best known header number: {}, total difficulty: {:#x}. \
129 Block download minimum requirements: header number: 500_000, total difficulty: {:#x}.",
130 number,
131 best_known.total_difficulty(),
132 self.sync_shared.state().min_chain_work()
133 );
134 }
135 }
136 CanStart::AssumeValidNotFound => {
137 let state = self.sync_shared.state();
138 let shared = self.sync_shared.shared();
139 let best_known = state.shared_best_header_ref();
140 let number = best_known.number();
141 let assume_valid_target: Byte32 = shared
142 .assume_valid_targets()
143 .as_ref()
144 .and_then(|targets| targets.first())
145 .map(Pack::pack)
146 .expect("assume valid target must exist");
147
148 if number != self.number && (number - self.number) % 10000 == 0 {
149 self.number = number;
150 let remaining_headers_sync_log = self.reaming_headers_sync_log();
151
152 info!(
153 "best known header {}-{}, \
154 CKB is syncing to latest Header to find the assume valid target: {}. \
155 Please wait. {}",
156 number,
157 best_known.hash(),
158 assume_valid_target,
159 remaining_headers_sync_log
160 );
161 }
162 }
163 }
164 }
165
166 fn reaming_headers_sync_log(&self) -> String {
167 if let Some(remaining_headers_needed) = self.calc_time_need_to_reach_latest_tip_header() {
168 format!(
169 "Need {} minutes to sync to the latest Header.",
170 remaining_headers_needed.as_secs() / 60
171 )
172 } else {
173 "".to_string()
174 }
175 }
176
177 fn calc_time_need_to_reach_latest_tip_header(&self) -> Option<Duration> {
189 let genesis_timestamp = self
190 .sync_shared
191 .consensus()
192 .genesis_block()
193 .header()
194 .timestamp();
195 let shared_best_timestamp = self.sync_shared.state().shared_best_header().timestamp();
196
197 let ckb_process_start_timestamp = self.start_timestamp;
198
199 let now_timestamp = unix_time_as_millis();
200
201 let ckb_chain_age = now_timestamp.checked_sub(genesis_timestamp)?;
202
203 let ckb_process_age = now_timestamp.checked_sub(ckb_process_start_timestamp)?;
204
205 let has_synced_headers_age = shared_best_timestamp.checked_sub(genesis_timestamp)?;
206
207 let ckb_sync_header_speed = has_synced_headers_age.checked_div(ckb_process_age)?;
208
209 let sync_all_headers_timecost = ckb_chain_age.checked_div(ckb_sync_header_speed)?;
210
211 let sync_remaining_headers_needed =
212 sync_all_headers_timecost.checked_sub(ckb_process_age)?;
213
214 Some(Duration::from_millis(sync_remaining_headers_needed))
215 }
216
217 fn run(&mut self, stop_signal: Receiver<()>) {
218 loop {
219 select! {
220 recv(self.recv) -> msg => {
221 if let Ok(cmd) = msg {
222 self.process_fetch_cmd(cmd)
223 }
224 }
225 recv(stop_signal) -> _ => {
226 info!("BlockDownload received exit signal, exit now");
227 return;
228 }
229 }
230 }
231 }
232
233 fn can_start(&mut self) -> CanStart {
234 if let CanStart::Ready = self.can_start {
235 return self.can_start;
236 }
237
238 let shared = self.sync_shared.shared();
239 let state = self.sync_shared.state();
240
241 let min_work_reach = |flag: &mut CanStart| {
242 if state.min_chain_work_ready() {
243 *flag = CanStart::AssumeValidNotFound;
244 }
245 };
246
247 let assume_valid_target_find = |flag: &mut CanStart| {
248 let mut assume_valid_targets = shared.assume_valid_targets();
249 if let Some(ref targets) = *assume_valid_targets {
250 if targets.is_empty() {
251 assume_valid_targets.take();
252 *flag = CanStart::Ready;
253 return;
254 }
255 let first_target = targets
256 .first()
257 .expect("has checked targets is not empty, assume valid target must exist");
258 match shared.header_map().get(&first_target.into()) {
259 Some(header) => {
260 if matches!(*flag, CanStart::FetchToTarget(fetch_target) if fetch_target == header.number())
261 {
262 } else {
264 *flag = CanStart::FetchToTarget(header.number());
265 info!(
266 "assume valid target found in header_map; CKB will start fetch blocks to {:?} now",
267 header.number_and_hash()
268 );
269 }
270 if unix_time_as_millis().saturating_sub(header.timestamp()) < MAX_TIP_AGE {
272 assume_valid_targets.take();
273 warn!(
274 "the duration gap between 'assume valid target' and 'now' is less than 24h; CKB will ignore the specified assume valid target and do full verification from now on"
275 );
276 }
277 }
278 None => {
279 if unix_time_as_millis()
281 .saturating_sub(state.shared_best_header_ref().timestamp())
282 < MAX_TIP_AGE
283 {
284 warn!(
285 "the duration gap between 'shared_best_header' and 'now' is less than 24h, but CKB haven't found the assume valid target in header_map; CKB will ignore the specified assume valid target and do full verification from now on"
286 );
287 *flag = CanStart::Ready;
288 assume_valid_targets.take();
289 }
290 }
291 }
292 } else {
293 *flag = CanStart::Ready;
294 }
295 };
296
297 match self.can_start {
298 CanStart::FetchToTarget(_) => {
299 assume_valid_target_find(&mut self.can_start);
300 self.can_start
301 }
302 CanStart::Ready => self.can_start,
303 CanStart::MinWorkNotReach => {
304 min_work_reach(&mut self.can_start);
305 if let CanStart::AssumeValidNotFound = self.can_start {
306 assume_valid_target_find(&mut self.can_start);
307 }
308 self.can_start
309 }
310 CanStart::AssumeValidNotFound => {
311 assume_valid_target_find(&mut self.can_start);
312 self.can_start
313 }
314 }
315 }
316
317 fn send_getblocks(v_fetch: Vec<packed::Byte32>, nc: &ServiceControl, peer: PeerIndex) {
318 let content = packed::GetBlocks::new_builder()
319 .block_hashes(v_fetch.clone())
320 .build();
321 let message = packed::SyncMessage::new_builder().set(content).build();
322
323 debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
324 if let Err(err) = nc.send_message_to(
325 peer,
326 SupportProtocols::Sync.protocol_id(),
327 message.as_bytes(),
328 ) {
329 debug!("synchronizer sending GetBlocks error: {:?}", err);
330 }
331 }
332}
333
334pub struct Synchronizer {
336 pub(crate) chain: ChainController,
337 pub shared: Arc<SyncShared>,
339 fetch_channel: Option<channel::Sender<FetchCMD>>,
340}
341
342impl Synchronizer {
343 pub fn new(chain: ChainController, shared: Arc<SyncShared>) -> Synchronizer {
347 Synchronizer {
348 chain,
349 shared,
350 fetch_channel: None,
351 }
352 }
353
354 pub fn shared(&self) -> &Arc<SyncShared> {
356 &self.shared
357 }
358
359 fn try_process(
360 &self,
361 nc: Arc<dyn CKBProtocolContext + Sync>,
362 peer: PeerIndex,
363 message: packed::SyncMessageUnionReader<'_>,
364 ) -> Status {
365 let _trace_timecost: Option<HistogramTimer> = {
366 ckb_metrics::handle().map(|handle| {
367 handle
368 .ckb_sync_msg_process_duration
369 .with_label_values(&[message.item_name()])
370 .start_timer()
371 })
372 };
373
374 match message {
375 packed::SyncMessageUnionReader::GetHeaders(reader) => {
376 GetHeadersProcess::new(reader, self, peer, nc.as_ref()).execute()
377 }
378 packed::SyncMessageUnionReader::SendHeaders(reader) => {
379 HeadersProcess::new(reader, self, peer, nc.as_ref()).execute()
380 }
381 packed::SyncMessageUnionReader::GetBlocks(reader) => {
382 GetBlocksProcess::new(reader, self, peer, nc.as_ref()).execute()
383 }
384 packed::SyncMessageUnionReader::SendBlock(reader) => {
385 if reader.check_data() {
386 BlockProcess::new(reader, self, peer, nc).execute()
387 } else {
388 StatusCode::ProtocolMessageIsMalformed.with_context("SendBlock is invalid")
389 }
390 }
391 packed::SyncMessageUnionReader::InIBD(_) => {
392 InIBDProcess::new(self, peer, nc.as_ref()).execute()
393 }
394 }
395 }
396
397 fn process(
398 &self,
399 nc: Arc<dyn CKBProtocolContext + Sync>,
400 peer: PeerIndex,
401 message: packed::SyncMessageUnionReader<'_>,
402 ) {
403 let item_name = message.item_name();
404 let item_bytes = message.as_slice().len() as u64;
405 let status = self.try_process(Arc::clone(&nc), peer, message);
406
407 metric_ckb_message_bytes(
408 MetricDirection::In,
409 &SupportProtocols::Sync.name(),
410 item_name,
411 Some(status.code()),
412 item_bytes,
413 );
414
415 post_sync_process(nc.as_ref(), peer, item_name, status);
416 }
417
418 pub fn peers(&self) -> &Peers {
420 self.shared().state().peers()
421 }
422
423 fn better_tip_header(&self) -> HeaderIndexView {
424 let (header, total_difficulty) = {
425 let active_chain = self.shared.active_chain();
426 (
427 active_chain.tip_header(),
428 active_chain.total_difficulty().to_owned(),
429 )
430 };
431 let best_known = self.shared.state().shared_best_header();
432 if total_difficulty > *best_known.total_difficulty() {
434 (header, total_difficulty).into()
435 } else {
436 best_known
437 }
438 }
439
440 pub fn asynchronous_process_remote_block(&self, remote_block: RemoteBlock) {
443 let block_hash = remote_block.block.hash();
444 let status = self.shared.active_chain().get_block_status(&block_hash);
445 if status.contains(BlockStatus::BLOCK_STORED) {
448 error!("Block {} already stored", block_hash);
449 } else if status.contains(BlockStatus::HEADER_VALID) {
450 self.shared.accept_remote_block(&self.chain, remote_block);
451 } else {
452 debug!(
453 "Synchronizer process_new_block unexpected status {:?} {}",
454 status, block_hash,
455 );
456 }
458 }
459
460 #[cfg(test)]
462 pub fn blocking_process_new_block(
463 &self,
464 block: core::BlockView,
465 _peer_id: PeerIndex,
466 ) -> Result<bool, ckb_error::Error> {
467 let block_hash = block.hash();
468 let status = self.shared.active_chain().get_block_status(&block_hash);
469 if status.contains(BlockStatus::BLOCK_STORED) {
472 error!("block {} already stored", block_hash);
473 Ok(false)
474 } else if status.contains(BlockStatus::HEADER_VALID) {
475 self.chain.blocking_process_block(Arc::new(block))
476 } else {
477 debug!(
478 "Synchronizer process_new_block unexpected status {:?} {}",
479 status, block_hash,
480 );
481 Ok(false)
483 }
484 }
485
486 pub fn get_blocks_to_fetch(
488 &self,
489 peer: PeerIndex,
490 ibd: IBDState,
491 ) -> Option<Vec<Vec<packed::Byte32>>> {
492 BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch(BlockNumber::MAX)
493 }
494
495 pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
496 let pid = SupportProtocols::Sync.protocol_id();
497 let (is_outbound, is_whitelist, is_2023edition) = nc
498 .get_peer(peer)
499 .map(|peer| {
500 (
501 peer.is_outbound(),
502 peer.is_whitelist,
503 peer.protocols.get(&pid).map(|v| v == "3").unwrap_or(false),
504 )
505 })
506 .unwrap_or((false, false, false));
507
508 self.peers()
509 .sync_connected(peer, is_outbound, is_whitelist, is_2023edition);
510 }
511
512 pub fn eviction(&self, nc: &dyn CKBProtocolContext) {
522 let active_chain = self.shared.active_chain();
523 let mut eviction = Vec::new();
524 let better_tip_header = self.better_tip_header();
525 for mut kv_pair in self.peers().state.iter_mut() {
526 let (peer, state) = kv_pair.pair_mut();
527 let now = unix_time_as_millis();
528
529 if let Some(ref mut controller) = state.headers_sync_controller {
530 let better_tip_ts = better_tip_header.timestamp();
531 if let Some(is_timeout) = controller.is_timeout(better_tip_ts, now) {
532 if is_timeout {
533 eviction.push(*peer);
534 continue;
535 }
536 } else {
537 active_chain.send_getheaders_to_peer(
538 nc,
539 *peer,
540 better_tip_header.number_and_hash(),
541 );
542 }
543 }
544
545 if active_chain.is_initial_block_download() {
552 continue;
553 }
554 if state.peer_flags.is_outbound {
555 let best_known_header = state.best_known_header.as_ref();
556 let (tip_header, local_total_difficulty) = {
557 (
558 active_chain.tip_header().to_owned(),
559 active_chain.total_difficulty().to_owned(),
560 )
561 };
562 if best_known_header
563 .map(|header_index| header_index.total_difficulty().clone())
564 .unwrap_or_default()
565 >= local_total_difficulty
566 {
567 if state.chain_sync.timeout != 0 {
568 state.chain_sync.timeout = 0;
569 state.chain_sync.work_header = None;
570 state.chain_sync.total_difficulty = None;
571 state.chain_sync.sent_getheaders = false;
572 }
573 } else if state.chain_sync.timeout == 0
574 || (best_known_header.is_some()
575 && best_known_header
576 .map(|header_index| header_index.total_difficulty().clone())
577 >= state.chain_sync.total_difficulty)
578 {
579 state.chain_sync.timeout = now + CHAIN_SYNC_TIMEOUT;
584 state.chain_sync.work_header = Some(tip_header);
585 state.chain_sync.total_difficulty = Some(local_total_difficulty);
586 state.chain_sync.sent_getheaders = false;
587 } else if state.chain_sync.timeout > 0 && now > state.chain_sync.timeout {
588 if state.chain_sync.sent_getheaders {
592 if state.peer_flags.is_protect || state.peer_flags.is_whitelist {
593 if state.sync_started() {
594 self.shared().state().suspend_sync(state);
595 }
596 } else {
597 eviction.push(*peer);
598 }
599 } else {
600 state.chain_sync.sent_getheaders = true;
601 state.chain_sync.timeout = now + EVICTION_HEADERS_RESPONSE_TIME;
602 active_chain.send_getheaders_to_peer(
603 nc,
604 *peer,
605 state
606 .chain_sync
607 .work_header
608 .as_ref()
609 .expect("work_header be assigned")
610 .into(),
611 );
612 }
613 }
614 }
615 }
616 for peer in eviction {
617 info!("Timeout eviction peer={}", peer);
618 if let Err(err) = nc.disconnect(peer, "sync timeout eviction") {
619 debug!("synchronizer disconnect error: {:?}", err);
620 }
621 }
622 }
623
624 fn start_sync_headers(&self, nc: &dyn CKBProtocolContext) {
625 let now = unix_time_as_millis();
626 let active_chain = self.shared.active_chain();
627 let ibd = active_chain.is_initial_block_download();
628 let peers: Vec<PeerIndex> = self
629 .peers()
630 .state
631 .iter()
632 .filter(|kv_pair| kv_pair.value().can_start_sync(now, ibd))
633 .map(|kv_pair| *kv_pair.key())
634 .collect();
635
636 if peers.is_empty() {
637 return;
638 }
639
640 let tip = self.better_tip_header();
641
642 for peer in peers {
643 if self
645 .shared()
646 .state()
647 .n_sync_started()
648 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |x| {
649 if ibd && x != 0 { None } else { Some(x + 1) }
650 })
651 .is_err()
652 {
653 break;
654 }
655 {
656 if let Some(mut peer_state) = self.peers().state.get_mut(&peer) {
657 peer_state.start_sync(HeadersSyncController::from_header(&tip));
658 }
659 }
660
661 debug!("Start sync peer={}", peer);
662 active_chain.send_getheaders_to_peer(nc, peer, tip.number_and_hash());
663 }
664 }
665
666 fn get_peers_to_fetch(
667 &self,
668 ibd: IBDState,
669 disconnect_list: &HashSet<PeerIndex>,
670 ) -> Vec<PeerIndex> {
671 trace!("Poll find_blocks_to_fetch selecting peers");
672 let state = &self
673 .shared
674 .state()
675 .read_inflight_blocks()
676 .download_schedulers;
677 let mut peers: Vec<PeerIndex> = self
678 .peers()
679 .state
680 .iter()
681 .filter(|kv_pair| {
682 let (id, state) = kv_pair.pair();
683 if disconnect_list.contains(id) {
684 return false;
685 };
686 match ibd {
687 IBDState::In => {
688 state.peer_flags.is_outbound
689 || state.peer_flags.is_whitelist
690 || state.peer_flags.is_protect
691 }
692 IBDState::Out => state.started_or_tip_synced(),
693 }
694 })
695 .map(|kv_pair| *kv_pair.key())
696 .collect();
697 peers.sort_by_key(|id| {
698 ::std::cmp::Reverse(
699 state
700 .get(id)
701 .map_or(INIT_BLOCKS_IN_TRANSIT_PER_PEER, |d| d.task_count()),
702 )
703 });
704 peers
705 }
706
707 fn find_blocks_to_fetch(&mut self, nc: &dyn CKBProtocolContext, ibd: IBDState) {
708 if self.chain.is_verifying_unverified_blocks_on_startup() {
709 trace!(
710 "skip find_blocks_to_fetch, ckb_chain is verifying unverified blocks on startup"
711 );
712 return;
713 }
714
715 if ckb_stop_handler::has_received_stop_signal() {
716 info!("received stop signal, stop find_blocks_to_fetch");
717 return;
718 }
719
720 let unverified_tip = self.shared.active_chain().unverified_tip_number();
721
722 let disconnect_list = {
723 let mut list = self
724 .shared()
725 .state()
726 .write_inflight_blocks()
727 .prune(unverified_tip);
728 if let IBDState::In = ibd {
729 list.extend(
732 self.shared
733 .state()
734 .peers()
735 .get_best_known_less_than_tip_and_unknown_empty(unverified_tip),
736 )
737 };
738 list
739 };
740
741 for peer in disconnect_list.iter() {
742 if self
748 .peers()
749 .get_flag(*peer)
750 .map(|flag| flag.is_whitelist)
751 .unwrap_or(false)
752 {
753 continue;
754 }
755 if let Err(err) = nc.disconnect(*peer, "sync disconnect") {
756 debug!("synchronizer disconnect error: {:?}", err);
757 }
758 }
759
760 match nc.p2p_control() {
763 Some(raw) => match self.fetch_channel {
764 Some(ref sender) => {
765 if !sender.is_full() {
766 let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
767 let _ignore = sender.try_send(FetchCMD {
768 peers,
769 ibd_state: ibd,
770 });
771 }
772 }
773 None => {
774 let p2p_control = raw.clone();
775 let (sender, recv) = channel::bounded(2);
776 let peers = self.get_peers_to_fetch(ibd, &disconnect_list);
777 sender
778 .send(FetchCMD {
779 peers,
780 ibd_state: ibd,
781 })
782 .unwrap();
783 self.fetch_channel = Some(sender);
784 let thread = ::std::thread::Builder::new();
785 let number = self.shared.state().shared_best_header_ref().number();
786 const THREAD_NAME: &str = "BlockDownload";
787 let sync_shared: Arc<SyncShared> = Arc::to_owned(self.shared());
788 let blockdownload_jh = thread
789 .name(THREAD_NAME.into())
790 .spawn(move || {
791 let stop_signal = new_crossbeam_exit_rx();
792 BlockFetchCMD {
793 sync_shared,
794 p2p_control,
795 recv,
796 number,
797 can_start: CanStart::MinWorkNotReach,
798 start_timestamp: unix_time_as_millis(),
799 }
800 .run(stop_signal);
801 })
802 .expect("download thread can't start");
803 register_thread(THREAD_NAME, blockdownload_jh);
804 }
805 },
806 None => {
807 for peer in self.get_peers_to_fetch(ibd, &disconnect_list) {
808 if let Some(fetch) = self.get_blocks_to_fetch(peer, ibd) {
809 for item in fetch {
810 self.send_getblocks(item, nc, peer);
811 }
812 }
813 }
814 }
815 }
816 }
817
818 fn send_getblocks(
819 &self,
820 v_fetch: Vec<packed::Byte32>,
821 nc: &dyn CKBProtocolContext,
822 peer: PeerIndex,
823 ) {
824 let content = packed::GetBlocks::new_builder()
825 .block_hashes(v_fetch.clone())
826 .build();
827 let message = packed::SyncMessage::new_builder().set(content).build();
828
829 debug!("send_getblocks len={:?} to peer={}", v_fetch.len(), peer);
830 let _status = send_message_to(nc, peer, &message);
831 }
832}
833
834#[async_trait]
835impl CKBProtocolHandler for Synchronizer {
836 async fn init(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>) {
837 nc.set_notify(SYNC_NOTIFY_INTERVAL, SEND_GET_HEADERS_TOKEN)
839 .await
840 .expect("set_notify at init is ok");
841 nc.set_notify(SYNC_NOTIFY_INTERVAL, TIMEOUT_EVICTION_TOKEN)
842 .await
843 .expect("set_notify at init is ok");
844 nc.set_notify(IBD_BLOCK_FETCH_INTERVAL, IBD_BLOCK_FETCH_TOKEN)
845 .await
846 .expect("set_notify at init is ok");
847 nc.set_notify(NOT_IBD_BLOCK_FETCH_INTERVAL, NOT_IBD_BLOCK_FETCH_TOKEN)
848 .await
849 .expect("set_notify at init is ok");
850 nc.set_notify(Duration::from_secs(2), NO_PEER_CHECK_TOKEN)
851 .await
852 .expect("set_notify at init is ok");
853 }
854
855 async fn received(
856 &mut self,
857 nc: Arc<dyn CKBProtocolContext + Sync>,
858 peer_index: PeerIndex,
859 data: Bytes,
860 ) {
861 let msg = match packed::SyncMessageReader::from_compatible_slice(&data) {
862 Ok(msg) => {
863 let item = msg.to_enum();
864 if let packed::SyncMessageUnionReader::SendBlock(ref reader) = item {
865 if reader.has_extra_fields() || reader.block().count_extra_fields() > 1 {
866 info!(
867 "A malformed message from peer {}: \
868 excessive fields detected in SendBlock",
869 peer_index
870 );
871 nc.ban_peer(
872 peer_index,
873 BAD_MESSAGE_BAN_TIME,
874 String::from(
875 "send us a malformed message: \
876 too many fields in SendBlock",
877 ),
878 );
879 return;
880 } else {
881 item
882 }
883 } else {
884 match packed::SyncMessageReader::from_slice(&data) {
885 Ok(msg) => msg.to_enum(),
886 _ => {
887 info!(
888 "A malformed message from peer {}: \
889 excessive fields",
890 peer_index
891 );
892 nc.ban_peer(
893 peer_index,
894 BAD_MESSAGE_BAN_TIME,
895 String::from(
896 "send us a malformed message: \
897 too many fields",
898 ),
899 );
900 return;
901 }
902 }
903 }
904 }
905 _ => {
906 info!("A malformed message from peer {}", peer_index);
907 nc.ban_peer(
908 peer_index,
909 BAD_MESSAGE_BAN_TIME,
910 String::from("send us a malformed message"),
911 );
912 return;
913 }
914 };
915
916 debug!("Received msg {} from {}", msg.item_name(), peer_index);
917 #[cfg(feature = "with_sentry")]
918 {
919 let sentry_hub = sentry::Hub::current();
920 let _scope_guard = sentry_hub.push_scope();
921 sentry_hub.configure_scope(|scope| {
922 scope.set_tag("p2p.protocol", "synchronizer");
923 scope.set_tag("p2p.message", msg.item_name());
924 });
925 }
926
927 let start_time = Instant::now();
928 tokio::task::block_in_place(|| self.process(nc, peer_index, msg));
929 debug!(
930 "Process message={}, peer={}, cost={:?}",
931 msg.item_name(),
932 peer_index,
933 Instant::now().saturating_duration_since(start_time),
934 );
935 }
936
937 async fn connected(
938 &mut self,
939 nc: Arc<dyn CKBProtocolContext + Sync>,
940 peer_index: PeerIndex,
941 _version: &str,
942 ) {
943 info!("SyncProtocol.connected peer={}", peer_index);
944 self.on_connected(nc.as_ref(), peer_index);
945 }
946
947 async fn disconnected(
948 &mut self,
949 _nc: Arc<dyn CKBProtocolContext + Sync>,
950 peer_index: PeerIndex,
951 ) {
952 let sync_state = self.shared().state();
953 sync_state.disconnected(peer_index);
954 info!("SyncProtocol.disconnected peer={}", peer_index);
955 }
956
957 async fn notify(&mut self, nc: Arc<dyn CKBProtocolContext + Sync>, token: u64) {
958 if !self.peers().state.is_empty() {
959 let start_time = Instant::now();
960 trace!("Start notify token={}", token);
961 match token {
962 SEND_GET_HEADERS_TOKEN => {
963 self.start_sync_headers(nc.as_ref());
964 }
965 IBD_BLOCK_FETCH_TOKEN => {
966 if self.shared.active_chain().is_initial_block_download() {
967 self.find_blocks_to_fetch(nc.as_ref(), IBDState::In);
968 } else {
969 {
970 self.shared.state().write_inflight_blocks().adjustment = false;
971 }
972 self.shared.state().peers().clear_unknown_list();
973 if nc.remove_notify(IBD_BLOCK_FETCH_TOKEN).await.is_err() {
974 trace!("Ibd block fetch token removal failed");
975 }
976 }
977 }
978 NOT_IBD_BLOCK_FETCH_TOKEN => {
979 if !self.shared.active_chain().is_initial_block_download() {
980 self.find_blocks_to_fetch(nc.as_ref(), IBDState::Out);
981 }
982 }
983 TIMEOUT_EVICTION_TOKEN => {
984 self.eviction(nc.as_ref());
985 }
986 _ => {}
988 }
989
990 trace!(
991 "Finished notify token={} cost={:?}",
992 token,
993 Instant::now().saturating_duration_since(start_time)
994 );
995 } else if token == NO_PEER_CHECK_TOKEN {
996 debug!("No peers connected");
997 }
998 }
999}