1use crate::synchronizer::Synchronizer;
2use crate::types::{ActiveChain, SyncShared};
3use crate::{Status, StatusCode};
4use ckb_constant::sync::MAX_HEADERS_LEN;
5use ckb_error::Error;
6use ckb_logger::{Level, debug, log_enabled, warn};
7use ckb_network::{CKBProtocolContext, PeerIndex};
8use ckb_shared::block_status::BlockStatus;
9use ckb_traits::HeaderFieldsProvider;
10use ckb_types::{core, packed, prelude::*};
11use ckb_verification::{HeaderError, HeaderVerifier};
12use ckb_verification_traits::Verifier;
13
14pub struct HeadersProcess<'a> {
15 message: packed::SendHeadersReader<'a>,
16 synchronizer: &'a Synchronizer,
17 peer: PeerIndex,
18 nc: &'a dyn CKBProtocolContext,
19 active_chain: ActiveChain,
20}
21
22impl<'a> HeadersProcess<'a> {
23 pub fn new(
24 message: packed::SendHeadersReader<'a>,
25 synchronizer: &'a Synchronizer,
26 peer: PeerIndex,
27 nc: &'a dyn CKBProtocolContext,
28 ) -> Self {
29 let active_chain = synchronizer.shared.active_chain();
30 HeadersProcess {
31 message,
32 nc,
33 synchronizer,
34 peer,
35 active_chain,
36 }
37 }
38
39 fn is_continuous(&self, headers: &[core::HeaderView]) -> bool {
40 for window in headers.windows(2) {
41 if let [parent, header] = &window {
42 if header.data().raw().parent_hash() != parent.hash() {
43 debug!(
44 "header.parent_hash {} parent.hash {}",
45 header.parent_hash(),
46 parent.hash()
47 );
48 return false;
49 }
50 }
51 }
52 true
53 }
54
55 pub fn accept_first(&self, first: &core::HeaderView) -> ValidationResult {
56 let shared: &SyncShared = self.synchronizer.shared();
57 let verifier = HeaderVerifier::new(shared, shared.consensus());
58 let acceptor = HeaderAcceptor::new(first, self.peer, verifier, self.active_chain.clone());
59 acceptor.accept()
60 }
61
62 fn debug(&self) {
63 if log_enabled!(Level::Debug) {
64 let shared_best_known = self.synchronizer.shared.state().shared_best_header();
66 let peer_best_known = self.synchronizer.peers().get_best_known_header(self.peer);
67 debug!(
68 "chain: num={}, diff={:#x};",
69 self.active_chain.tip_number(),
70 self.active_chain.total_difficulty()
71 );
72 debug!(
73 "shared best_known_header: num={}, diff={:#x}, hash={};",
74 shared_best_known.number(),
75 shared_best_known.total_difficulty(),
76 shared_best_known.hash(),
77 );
78 if let Some(header) = peer_best_known {
79 debug!(
80 "peer's best_known_header: peer: {}, num={}; diff={:#x}, hash={};",
81 self.peer,
82 header.number(),
83 header.total_difficulty(),
84 header.hash()
85 );
86 } else {
87 debug!("state: null;");
88 }
89 debug!("peer: {}", self.peer);
90 }
91 }
92
93 pub fn execute(self) -> Status {
94 debug!("HeadersProcess begins");
95 let shared: &SyncShared = self.synchronizer.shared();
96 let consensus = shared.consensus();
97 let headers = self
98 .message
99 .headers()
100 .to_entity()
101 .into_iter()
102 .map(packed::Header::into_view)
103 .collect::<Vec<_>>();
104
105 if headers.len() > MAX_HEADERS_LEN {
106 warn!("HeadersProcess is oversized");
107 return StatusCode::HeadersIsInvalid.with_context("oversize");
108 }
109
110 if headers.is_empty() {
111 debug!("HeadersProcess is_empty (synchronized)");
117 if let Some(mut state) = self.synchronizer.peers().state.get_mut(&self.peer) {
118 self.synchronizer
119 .shared()
120 .state()
121 .tip_synced(state.value_mut());
122 }
123 return Status::ok();
124 }
125
126 if !self.is_continuous(&headers) {
127 warn!("HeadersProcess is not continuous");
128 return StatusCode::HeadersIsInvalid.with_context("not continuous");
129 }
130
131 let result = self.accept_first(&headers[0]);
132 match result.state {
133 ValidationState::Invalid => {
134 debug!(
135 "HeadersProcess accept_first result is invalid, error = {:?}, first header = {:?}",
136 result.error, headers[0]
137 );
138 return StatusCode::HeadersIsInvalid
139 .with_context(format!("accept first header {:?}", headers[0]));
140 }
141 ValidationState::TemporaryInvalid => {
142 debug!(
143 "HeadersProcess accept_first result is temporary invalid, first header = {:?}",
144 headers[0]
145 );
146 return Status::ok();
147 }
148 ValidationState::Valid => {
149 }
151 };
152
153 for header in headers.iter().skip(1) {
154 let verifier = HeaderVerifier::new(shared, consensus);
155 let acceptor =
156 HeaderAcceptor::new(header, self.peer, verifier, self.active_chain.clone());
157 let result = acceptor.accept();
158 match result.state {
159 ValidationState::Invalid => {
160 debug!(
161 "HeadersProcess accept result is invalid, error = {:?}, header = {:?}",
162 result.error, headers,
163 );
164 return StatusCode::HeadersIsInvalid
165 .with_context(format!("accept header {header:?}"));
166 }
167 ValidationState::TemporaryInvalid => {
168 debug!(
169 "HeadersProcess accept result is temporarily invalid, header = {:?}",
170 header
171 );
172 return Status::ok();
173 }
174 ValidationState::Valid => {
175 }
177 };
178 }
179
180 self.debug();
181
182 if headers.len() == MAX_HEADERS_LEN {
183 let start = headers.last().expect("empty checked").into();
184 self.active_chain
185 .send_getheaders_to_peer(self.nc, self.peer, start);
186 } else if let Some(mut state) = self.synchronizer.peers().state.get_mut(&self.peer) {
187 self.synchronizer
188 .shared()
189 .state()
190 .tip_synced(state.value_mut());
191 }
192
193 let peer_flags = self
196 .synchronizer
197 .peers()
198 .get_flag(self.peer)
199 .unwrap_or_default();
200 if self.active_chain.is_initial_block_download()
201 && headers.len() != MAX_HEADERS_LEN
202 && (!peer_flags.is_protect && !peer_flags.is_whitelist && peer_flags.is_outbound)
203 {
204 debug!("Disconnect an unprotected outbound peer ({})", self.peer);
205 if let Err(err) = self
206 .nc
207 .disconnect(self.peer, "useless outbound peer in IBD")
208 {
209 return StatusCode::Network.with_context(format!("Disconnect error: {err:?}"));
210 }
211 }
212
213 Status::ok()
214 }
215}
216
217pub struct HeaderAcceptor<'a, DL: HeaderFieldsProvider> {
218 header: &'a core::HeaderView,
219 active_chain: ActiveChain,
220 peer: PeerIndex,
221 verifier: HeaderVerifier<'a, DL>,
222}
223
224impl<'a, DL: HeaderFieldsProvider> HeaderAcceptor<'a, DL> {
225 pub fn new(
226 header: &'a core::HeaderView,
227 peer: PeerIndex,
228 verifier: HeaderVerifier<'a, DL>,
229 active_chain: ActiveChain,
230 ) -> Self {
231 HeaderAcceptor {
232 header,
233 peer,
234 verifier,
235 active_chain,
236 }
237 }
238
239 pub fn prev_block_check(&self, state: &mut ValidationResult) -> Result<(), ()> {
240 if self.active_chain.contains_block_status(
241 &self.header.data().raw().parent_hash(),
242 BlockStatus::BLOCK_INVALID,
243 ) {
244 state.invalid(Some(ValidationError::InvalidParent));
245 return Err(());
246 }
247 Ok(())
248 }
249
250 pub fn non_contextual_check(&self, state: &mut ValidationResult) -> Result<(), bool> {
251 self.verifier.verify(self.header).map_err(|error| {
252 debug!(
253 "HeadersProcess accepted {:?} error {:?}",
254 self.header.number(),
255 error
256 );
257 if let Some(header_error) = error.downcast_ref::<HeaderError>() {
259 if header_error.is_too_new() {
260 state.temporary_invalid(Some(ValidationError::Verify(error)));
261 false
262 } else {
263 state.invalid(Some(ValidationError::Verify(error)));
264 true
265 }
266 } else {
267 state.invalid(Some(ValidationError::Verify(error)));
268 true
269 }
270 })
271 }
272
273 pub fn version_check(&self, state: &mut ValidationResult) -> Result<(), ()> {
274 if self.header.version() != 0 {
275 state.invalid(Some(ValidationError::Version));
276 Err(())
277 } else {
278 Ok(())
279 }
280 }
281
282 pub fn accept(&self) -> ValidationResult {
283 let mut result = ValidationResult::default();
284 let sync_shared = self.active_chain.sync_shared();
285 let state = self.active_chain.state();
286 let shared = sync_shared.shared();
287
288 let status = self.active_chain.get_block_status(&self.header.hash());
291 if status.contains(BlockStatus::HEADER_VALID) {
292 let header_index = sync_shared
293 .get_header_index_view(
294 &self.header.hash(),
295 status.contains(BlockStatus::BLOCK_STORED),
296 )
297 .unwrap_or_else(|| {
298 panic!(
299 "header {}-{} with HEADER_VALID should exist",
300 self.header.number(),
301 self.header.hash()
302 )
303 })
304 .as_header_index();
305 state
306 .peers()
307 .may_set_best_known_header(self.peer, header_index);
308 return result;
309 }
310
311 if self.prev_block_check(&mut result).is_err() {
312 debug!(
313 "HeadersProcess rejected invalid-parent header: {} {}",
314 self.header.number(),
315 self.header.hash(),
316 );
317 shared.insert_block_status(self.header.hash(), BlockStatus::BLOCK_INVALID);
318 return result;
319 }
320
321 if let Some(is_invalid) = self.non_contextual_check(&mut result).err() {
322 debug!(
323 "HeadersProcess rejected non-contextual header: {} {}",
324 self.header.number(),
325 self.header.hash(),
326 );
327 if is_invalid {
328 shared.insert_block_status(self.header.hash(), BlockStatus::BLOCK_INVALID);
329 }
330 return result;
331 }
332
333 if self.version_check(&mut result).is_err() {
334 debug!(
335 "HeadersProcess rejected invalid-version header: {} {}",
336 self.header.number(),
337 self.header.hash(),
338 );
339 shared.insert_block_status(self.header.hash(), BlockStatus::BLOCK_INVALID);
340 return result;
341 }
342
343 sync_shared.insert_valid_header(self.peer, self.header);
344 result
345 }
346}
347
348#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
349pub enum ValidationState {
350 #[default]
351 Valid,
352 TemporaryInvalid,
353 Invalid,
354}
355
356#[allow(dead_code)]
357#[derive(Debug)]
358pub enum ValidationError {
359 Verify(Error),
360 Version,
361 InvalidParent,
362}
363
364#[derive(Debug, Default)]
365pub struct ValidationResult {
366 pub error: Option<ValidationError>,
367 pub state: ValidationState,
368}
369
370impl ValidationResult {
371 pub fn invalid(&mut self, error: Option<ValidationError>) {
372 self.error = error;
373 self.state = ValidationState::Invalid;
374 }
375
376 pub fn temporary_invalid(&mut self, error: Option<ValidationError>) {
377 self.error = error;
378 self.state = ValidationState::TemporaryInvalid;
379 }
380}