1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use crate::synchronizer::Synchronizer;
use crate::utils::{send_message, send_message_to};
use crate::{attempt, Status, StatusCode};
use ckb_constant::sync::MAX_LOCATOR_SIZE;
use ckb_logger::{debug, info};
use ckb_network::{CKBProtocolContext, PeerIndex, SupportProtocols};
use ckb_types::{
core,
packed::{self, Byte32},
prelude::*,
};
pub struct GetHeadersProcess<'a> {
message: packed::GetHeadersReader<'a>,
synchronizer: &'a Synchronizer,
peer: PeerIndex,
nc: &'a dyn CKBProtocolContext,
}
impl<'a> GetHeadersProcess<'a> {
pub fn new(
message: packed::GetHeadersReader<'a>,
synchronizer: &'a Synchronizer,
peer: PeerIndex,
nc: &'a dyn CKBProtocolContext,
) -> Self {
GetHeadersProcess {
message,
nc,
synchronizer,
peer,
}
}
pub fn execute(self) -> Status {
let active_chain = self.synchronizer.shared.active_chain();
let block_locator_hashes = self
.message
.block_locator_hashes()
.iter()
.map(|x| x.to_entity())
.collect::<Vec<Byte32>>();
let hash_stop = self.message.hash_stop().to_entity();
let locator_size = block_locator_hashes.len();
if locator_size > MAX_LOCATOR_SIZE {
return StatusCode::ProtocolMessageIsMalformed.with_context(format!(
"Locator count({}) > MAX_LOCATOR_SIZE({})",
locator_size, MAX_LOCATOR_SIZE,
));
}
if active_chain.is_initial_block_download() {
info!(
"Ignoring getheaders from peer={} because node is in initial block download",
self.peer
);
self.send_in_ibd();
let state = self.synchronizer.shared.state();
if let Some(flag) = state.peers().get_flag(self.peer) {
if flag.is_outbound || flag.is_whitelist || flag.is_protect {
state.insert_peer_unknown_header_list(self.peer, block_locator_hashes);
}
};
return Status::ignored();
}
if let Some(block_number) =
active_chain.locate_latest_common_block(&hash_stop, &block_locator_hashes[..])
{
debug!(
"headers latest_common={} tip={} begin",
block_number,
active_chain.tip_header().number(),
);
self.synchronizer.peers().getheaders_received(self.peer);
let headers: Vec<core::HeaderView> =
active_chain.get_locator_response(block_number, &hash_stop);
debug!("headers len={}", headers.len());
let content = packed::SendHeaders::new_builder()
.headers(headers.into_iter().map(|x| x.data()).pack())
.build();
let message = packed::SyncMessage::new_builder().set(content).build();
attempt!(send_message_to(self.nc, self.peer, &message));
} else {
return StatusCode::GetHeadersMissCommonAncestors
.with_context(format!("{:#x?}", block_locator_hashes,));
}
Status::ok()
}
fn send_in_ibd(&self) {
let content = packed::InIBD::new_builder().build();
let message = packed::SyncMessage::new_builder().set(content).build();
let _ignore = send_message(
SupportProtocols::Sync.protocol_id(),
self.nc,
self.peer,
&message,
);
}
}