ckb_sync/synchronizer/
headers_process.rs

1use crate::synchronizer::Synchronizer;
2use crate::types::{ActiveChain, SyncShared};
3use crate::{Status, StatusCode};
4use ckb_constant::sync::MAX_HEADERS_LEN;
5use ckb_error::Error;
6use ckb_logger::{Level, debug, log_enabled, warn};
7use ckb_network::{CKBProtocolContext, PeerIndex};
8use ckb_shared::block_status::BlockStatus;
9use ckb_traits::HeaderFieldsProvider;
10use ckb_types::{core, packed, prelude::*};
11use ckb_verification::{HeaderError, HeaderVerifier};
12use ckb_verification_traits::Verifier;
13
14pub struct HeadersProcess<'a> {
15    message: packed::SendHeadersReader<'a>,
16    synchronizer: &'a Synchronizer,
17    peer: PeerIndex,
18    nc: &'a dyn CKBProtocolContext,
19    active_chain: ActiveChain,
20}
21
22impl<'a> HeadersProcess<'a> {
23    pub fn new(
24        message: packed::SendHeadersReader<'a>,
25        synchronizer: &'a Synchronizer,
26        peer: PeerIndex,
27        nc: &'a dyn CKBProtocolContext,
28    ) -> Self {
29        let active_chain = synchronizer.shared.active_chain();
30        HeadersProcess {
31            message,
32            nc,
33            synchronizer,
34            peer,
35            active_chain,
36        }
37    }
38
39    fn is_continuous(&self, headers: &[core::HeaderView]) -> bool {
40        for window in headers.windows(2) {
41            if let [parent, header] = &window {
42                if header.data().raw().parent_hash() != parent.hash() {
43                    debug!(
44                        "header.parent_hash {} parent.hash {}",
45                        header.parent_hash(),
46                        parent.hash()
47                    );
48                    return false;
49                }
50            }
51        }
52        true
53    }
54
55    pub fn accept_first(&self, first: &core::HeaderView) -> ValidationResult {
56        let shared: &SyncShared = self.synchronizer.shared();
57        let verifier = HeaderVerifier::new(shared, shared.consensus());
58        let acceptor = HeaderAcceptor::new(first, self.peer, verifier, self.active_chain.clone());
59        acceptor.accept()
60    }
61
62    fn debug(&self) {
63        if log_enabled!(Level::Debug) {
64            // Regain the updated best known
65            let shared_best_known = self.synchronizer.shared.state().shared_best_header();
66            let peer_best_known = self.synchronizer.peers().get_best_known_header(self.peer);
67            debug!(
68                "chain: num={}, diff={:#x};",
69                self.active_chain.tip_number(),
70                self.active_chain.total_difficulty()
71            );
72            debug!(
73                "shared best_known_header: num={}, diff={:#x}, hash={};",
74                shared_best_known.number(),
75                shared_best_known.total_difficulty(),
76                shared_best_known.hash(),
77            );
78            if let Some(header) = peer_best_known {
79                debug!(
80                    "peer's best_known_header: peer: {}, num={}; diff={:#x}, hash={};",
81                    self.peer,
82                    header.number(),
83                    header.total_difficulty(),
84                    header.hash()
85                );
86            } else {
87                debug!("state: null;");
88            }
89            debug!("peer: {}", self.peer);
90        }
91    }
92
93    pub fn execute(self) -> Status {
94        debug!("HeadersProcess begins");
95        let shared: &SyncShared = self.synchronizer.shared();
96        let consensus = shared.consensus();
97        let headers = self
98            .message
99            .headers()
100            .to_entity()
101            .into_iter()
102            .map(packed::Header::into_view)
103            .collect::<Vec<_>>();
104
105        if headers.len() > MAX_HEADERS_LEN {
106            warn!("HeadersProcess is oversized");
107            return StatusCode::HeadersIsInvalid.with_context("oversize");
108        }
109
110        if headers.is_empty() {
111            // Empty means that the other peer's tip may be consistent with our own best known,
112            // but empty cannot 100% confirm this, so it does not set the other peer's best header
113            // to the shared best known.
114            // This action means that if the newly connected node has not been sync with headers,
115            // it cannot be used as a synchronization node.
116            debug!("HeadersProcess is_empty (synchronized)");
117            if let Some(mut state) = self.synchronizer.peers().state.get_mut(&self.peer) {
118                self.synchronizer
119                    .shared()
120                    .state()
121                    .tip_synced(state.value_mut());
122            }
123            return Status::ok();
124        }
125
126        if !self.is_continuous(&headers) {
127            warn!("HeadersProcess is not continuous");
128            return StatusCode::HeadersIsInvalid.with_context("not continuous");
129        }
130
131        let result = self.accept_first(&headers[0]);
132        match result.state {
133            ValidationState::Invalid => {
134                debug!(
135                    "HeadersProcess accept_first result is invalid, error = {:?}, first header = {:?}",
136                    result.error, headers[0]
137                );
138                return StatusCode::HeadersIsInvalid
139                    .with_context(format!("accept first header {:?}", headers[0]));
140            }
141            ValidationState::TemporaryInvalid => {
142                debug!(
143                    "HeadersProcess accept_first result is temporary invalid, first header = {:?}",
144                    headers[0]
145                );
146                return Status::ok();
147            }
148            ValidationState::Valid => {
149                // Valid, do nothing
150            }
151        };
152
153        for header in headers.iter().skip(1) {
154            let verifier = HeaderVerifier::new(shared, consensus);
155            let acceptor =
156                HeaderAcceptor::new(header, self.peer, verifier, self.active_chain.clone());
157            let result = acceptor.accept();
158            match result.state {
159                ValidationState::Invalid => {
160                    debug!(
161                        "HeadersProcess accept result is invalid, error = {:?}, header = {:?}",
162                        result.error, headers,
163                    );
164                    return StatusCode::HeadersIsInvalid
165                        .with_context(format!("accept header {header:?}"));
166                }
167                ValidationState::TemporaryInvalid => {
168                    debug!(
169                        "HeadersProcess accept result is temporarily invalid, header = {:?}",
170                        header
171                    );
172                    return Status::ok();
173                }
174                ValidationState::Valid => {
175                    // Valid, do nothing
176                }
177            };
178        }
179
180        self.debug();
181
182        if headers.len() == MAX_HEADERS_LEN {
183            let start = headers.last().expect("empty checked").into();
184            self.active_chain
185                .send_getheaders_to_peer(self.nc, self.peer, start);
186        } else if let Some(mut state) = self.synchronizer.peers().state.get_mut(&self.peer) {
187            self.synchronizer
188                .shared()
189                .state()
190                .tip_synced(state.value_mut());
191        }
192
193        // If we're in IBD, we want outbound peers that will serve us a useful
194        // chain. Disconnect peers that are on chains with insufficient work.
195        let peer_flags = self
196            .synchronizer
197            .peers()
198            .get_flag(self.peer)
199            .unwrap_or_default();
200        if self.active_chain.is_initial_block_download()
201            && headers.len() != MAX_HEADERS_LEN
202            && (!peer_flags.is_protect && !peer_flags.is_whitelist && peer_flags.is_outbound)
203        {
204            debug!("Disconnect an unprotected outbound peer ({})", self.peer);
205            if let Err(err) = self
206                .nc
207                .disconnect(self.peer, "useless outbound peer in IBD")
208            {
209                return StatusCode::Network.with_context(format!("Disconnect error: {err:?}"));
210            }
211        }
212
213        Status::ok()
214    }
215}
216
217pub struct HeaderAcceptor<'a, DL: HeaderFieldsProvider> {
218    header: &'a core::HeaderView,
219    active_chain: ActiveChain,
220    peer: PeerIndex,
221    verifier: HeaderVerifier<'a, DL>,
222}
223
224impl<'a, DL: HeaderFieldsProvider> HeaderAcceptor<'a, DL> {
225    pub fn new(
226        header: &'a core::HeaderView,
227        peer: PeerIndex,
228        verifier: HeaderVerifier<'a, DL>,
229        active_chain: ActiveChain,
230    ) -> Self {
231        HeaderAcceptor {
232            header,
233            peer,
234            verifier,
235            active_chain,
236        }
237    }
238
239    pub fn prev_block_check(&self, state: &mut ValidationResult) -> Result<(), ()> {
240        if self.active_chain.contains_block_status(
241            &self.header.data().raw().parent_hash(),
242            BlockStatus::BLOCK_INVALID,
243        ) {
244            state.invalid(Some(ValidationError::InvalidParent));
245            return Err(());
246        }
247        Ok(())
248    }
249
250    pub fn non_contextual_check(&self, state: &mut ValidationResult) -> Result<(), bool> {
251        self.verifier.verify(self.header).map_err(|error| {
252            debug!(
253                "HeadersProcess accepted {:?} error {:?}",
254                self.header.number(),
255                error
256            );
257            // HeaderVerifier return HeaderError or UnknownParentError
258            if let Some(header_error) = error.downcast_ref::<HeaderError>() {
259                if header_error.is_too_new() {
260                    state.temporary_invalid(Some(ValidationError::Verify(error)));
261                    false
262                } else {
263                    state.invalid(Some(ValidationError::Verify(error)));
264                    true
265                }
266            } else {
267                state.invalid(Some(ValidationError::Verify(error)));
268                true
269            }
270        })
271    }
272
273    pub fn version_check(&self, state: &mut ValidationResult) -> Result<(), ()> {
274        if self.header.version() != 0 {
275            state.invalid(Some(ValidationError::Version));
276            Err(())
277        } else {
278            Ok(())
279        }
280    }
281
282    pub fn accept(&self) -> ValidationResult {
283        let mut result = ValidationResult::default();
284        let sync_shared = self.active_chain.sync_shared();
285        let state = self.active_chain.state();
286        let shared = sync_shared.shared();
287
288        // FIXME If status == BLOCK_INVALID then return early. But which error
289        // type should we return?
290        let status = self.active_chain.get_block_status(&self.header.hash());
291        if status.contains(BlockStatus::HEADER_VALID) {
292            let header_index = sync_shared
293                .get_header_index_view(
294                    &self.header.hash(),
295                    status.contains(BlockStatus::BLOCK_STORED),
296                )
297                .unwrap_or_else(|| {
298                    panic!(
299                        "header {}-{} with HEADER_VALID should exist",
300                        self.header.number(),
301                        self.header.hash()
302                    )
303                })
304                .as_header_index();
305            state
306                .peers()
307                .may_set_best_known_header(self.peer, header_index);
308            return result;
309        }
310
311        if self.prev_block_check(&mut result).is_err() {
312            debug!(
313                "HeadersProcess rejected invalid-parent header: {} {}",
314                self.header.number(),
315                self.header.hash(),
316            );
317            shared.insert_block_status(self.header.hash(), BlockStatus::BLOCK_INVALID);
318            return result;
319        }
320
321        if let Some(is_invalid) = self.non_contextual_check(&mut result).err() {
322            debug!(
323                "HeadersProcess rejected non-contextual header: {} {}",
324                self.header.number(),
325                self.header.hash(),
326            );
327            if is_invalid {
328                shared.insert_block_status(self.header.hash(), BlockStatus::BLOCK_INVALID);
329            }
330            return result;
331        }
332
333        if self.version_check(&mut result).is_err() {
334            debug!(
335                "HeadersProcess rejected invalid-version header: {} {}",
336                self.header.number(),
337                self.header.hash(),
338            );
339            shared.insert_block_status(self.header.hash(), BlockStatus::BLOCK_INVALID);
340            return result;
341        }
342
343        sync_shared.insert_valid_header(self.peer, self.header);
344        result
345    }
346}
347
348#[derive(Default, Debug, Copy, Clone, PartialEq, Eq)]
349pub enum ValidationState {
350    #[default]
351    Valid,
352    TemporaryInvalid,
353    Invalid,
354}
355
356#[allow(dead_code)]
357#[derive(Debug)]
358pub enum ValidationError {
359    Verify(Error),
360    Version,
361    InvalidParent,
362}
363
364#[derive(Debug, Default)]
365pub struct ValidationResult {
366    pub error: Option<ValidationError>,
367    pub state: ValidationState,
368}
369
370impl ValidationResult {
371    pub fn invalid(&mut self, error: Option<ValidationError>) {
372        self.error = error;
373        self.state = ValidationState::Invalid;
374    }
375
376    pub fn temporary_invalid(&mut self, error: Option<ValidationError>) {
377        self.error = error;
378        self.state = ValidationState::TemporaryInvalid;
379    }
380}