Skip to main content

sc_consensus/
import_queue.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Import Queue primitive: something which can verify and import blocks.
20//!
21//! This serves as an intermediate and abstracted step between synchronization
22//! and import. Each mode of consensus will have its own requirements for block
23//! verification. Some algorithms can verify in parallel, while others only
24//! sequentially.
25//!
26//! The `ImportQueue` trait allows such verification strategies to be
27//! instantiated. The `BasicQueue` and `BasicVerifier` traits allow serial
28//! queues to be instantiated simply.
29
30use log::{debug, trace};
31use std::{
32	fmt,
33	time::{Duration, Instant},
34};
35
36use sp_consensus::{error::Error as ConsensusError, BlockOrigin};
37use sp_runtime::{
38	traits::{Block as BlockT, Header as _, NumberFor},
39	Justifications,
40};
41
42use crate::{
43	block_import::{
44		BlockCheckParams, BlockImport, BlockImportParams, ImportResult, ImportedAux, ImportedState,
45		JustificationImport, StateAction,
46	},
47	metrics::Metrics,
48};
49
50pub use basic_queue::BasicQueue;
51
52const LOG_TARGET: &str = "sync::import-queue";
53
54/// A commonly-used Import Queue type.
55///
56/// This defines the transaction type of the `BasicQueue` to be the transaction type for a client.
57pub type DefaultImportQueue<Block> = BasicQueue<Block>;
58
59mod basic_queue;
60pub mod buffered_link;
61pub mod mock;
62
63/// Shared block import struct used by the queue.
64pub type BoxBlockImport<B> = Box<dyn BlockImport<B, Error = ConsensusError> + Send + Sync>;
65
66/// Shared justification import struct used by the queue.
67pub type BoxJustificationImport<B> =
68	Box<dyn JustificationImport<B, Error = ConsensusError> + Send + Sync>;
69
70/// Maps to the RuntimeOrigin used by the network.
71pub type RuntimeOrigin = sc_network_types::PeerId;
72
73/// Block data used by the queue.
74#[derive(Debug, PartialEq, Eq, Clone)]
75pub struct IncomingBlock<B: BlockT> {
76	/// Block header hash.
77	pub hash: <B as BlockT>::Hash,
78	/// Block header if requested.
79	pub header: Option<<B as BlockT>::Header>,
80	/// Block body if requested.
81	pub body: Option<Vec<<B as BlockT>::Extrinsic>>,
82	/// Indexed block body if requested.
83	pub indexed_body: Option<Vec<Vec<u8>>>,
84	/// Justification(s) if requested.
85	pub justifications: Option<Justifications>,
86	/// The peer, we received this from
87	pub origin: Option<RuntimeOrigin>,
88	/// Allow importing the block skipping state verification if parent state is missing.
89	pub allow_missing_state: bool,
90	/// Skip block execution and state verification.
91	pub skip_execution: bool,
92	/// Re-validate existing block.
93	pub import_existing: bool,
94	/// Do not compute new state, but rather set it to the given set.
95	pub state: Option<ImportedState<B>>,
96}
97
98/// Verify a justification of a block
99#[async_trait::async_trait]
100pub trait Verifier<B: BlockT>: Send + Sync {
101	/// Verify the given block data and return the `BlockImportParams` to
102	/// continue the block import process.
103	async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String>;
104}
105
106/// Blocks import queue API.
107///
108/// The `import_*` methods can be called in order to send elements for the import queue to verify.
109pub trait ImportQueueService<B: BlockT>: Send {
110	/// Import a bunch of blocks, every next block must be an ancestor of the previous block in the
111	/// list.
112	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
113
114	/// Import block justifications.
115	fn import_justifications(
116		&mut self,
117		who: RuntimeOrigin,
118		hash: B::Hash,
119		number: NumberFor<B>,
120		justifications: Justifications,
121	);
122}
123
124#[async_trait::async_trait]
125pub trait ImportQueue<B: BlockT>: Send {
126	/// Get a copy of the handle to [`ImportQueueService`].
127	fn service(&self) -> Box<dyn ImportQueueService<B>>;
128
129	/// Get a reference to the handle to [`ImportQueueService`].
130	fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;
131
132	/// This method should behave in a way similar to `Future::poll`. It can register the current
133	/// task and notify later when more actions are ready to be polled. To continue the comparison,
134	/// it is as if this method always returned `Poll::Pending`.
135	fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &dyn Link<B>);
136
137	/// Start asynchronous runner for import queue.
138	///
139	/// Takes an object implementing [`Link`] which allows the import queue to
140	/// influence the synchronization process.
141	async fn run(self, link: &dyn Link<B>);
142}
143
144/// The result of importing a justification.
145#[derive(Debug, PartialEq)]
146pub enum JustificationImportResult {
147	/// Justification was imported successfully.
148	Success,
149
150	/// Justification was not imported successfully.
151	Failure,
152
153	/// Justification was not imported successfully, because it is outdated.
154	OutdatedJustification,
155}
156
157/// Hooks that the verification queue can use to influence the synchronization
158/// algorithm.
159pub trait Link<B: BlockT>: Send + Sync {
160	/// Batch of blocks imported, with or without error.
161	fn blocks_processed(
162		&self,
163		_imported: usize,
164		_count: usize,
165		_results: Vec<(BlockImportResult<B>, B::Hash)>,
166	) {
167	}
168
169	/// Justification import result.
170	fn justification_imported(
171		&self,
172		_who: RuntimeOrigin,
173		_hash: &B::Hash,
174		_number: NumberFor<B>,
175		_import_result: JustificationImportResult,
176	) {
177	}
178
179	/// Request a justification for the given block.
180	fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) {}
181}
182
183/// Block import successful result.
184#[derive(Debug, PartialEq)]
185pub enum BlockImportStatus<BlockNumber: fmt::Debug + PartialEq> {
186	/// Imported known block.
187	ImportedKnown(BlockNumber, Option<RuntimeOrigin>),
188	/// Imported unknown block.
189	ImportedUnknown(BlockNumber, ImportedAux, Option<RuntimeOrigin>),
190}
191
192impl<BlockNumber: fmt::Debug + PartialEq> BlockImportStatus<BlockNumber> {
193	/// Returns the imported block number.
194	pub fn number(&self) -> &BlockNumber {
195		match self {
196			BlockImportStatus::ImportedKnown(n, _) |
197			BlockImportStatus::ImportedUnknown(n, _, _) => n,
198		}
199	}
200}
201
202/// Block import error.
203#[derive(Debug, thiserror::Error)]
204pub enum BlockImportError {
205	/// Block missed header, can't be imported
206	#[error("block is missing a header (origin = {0:?})")]
207	IncompleteHeader(Option<RuntimeOrigin>),
208
209	/// Block verification failed, can't be imported
210	#[error("block verification failed (origin = {0:?}): {1}")]
211	VerificationFailed(Option<RuntimeOrigin>, String),
212
213	/// Block is known to be Bad
214	#[error("bad block (origin = {0:?})")]
215	BadBlock(Option<RuntimeOrigin>),
216
217	/// Parent state is missing.
218	#[error("block is missing parent state")]
219	MissingState,
220
221	/// Block has an unknown parent
222	#[error("block has an unknown parent")]
223	UnknownParent,
224
225	/// Block import has been cancelled. This can happen if the parent block fails to be imported.
226	#[error("import has been cancelled")]
227	Cancelled,
228
229	/// Other error.
230	#[error("consensus error: {0}")]
231	Other(ConsensusError),
232}
233
234type BlockImportResult<B> = Result<BlockImportStatus<NumberFor<B>>, BlockImportError>;
235
236/// Single block import function.
237pub async fn import_single_block<B: BlockT, V: Verifier<B>>(
238	import_handle: &mut impl BlockImport<B, Error = ConsensusError>,
239	block_origin: BlockOrigin,
240	block: IncomingBlock<B>,
241	verifier: &V,
242) -> BlockImportResult<B> {
243	match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? {
244		SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status),
245		SingleBlockVerificationOutcome::Verified(import_parameters) => {
246			import_single_block_metered(import_handle, import_parameters, None).await
247		},
248	}
249}
250
251fn import_handler<Block>(
252	number: NumberFor<Block>,
253	hash: Block::Hash,
254	parent_hash: Block::Hash,
255	block_origin: Option<RuntimeOrigin>,
256	import: Result<ImportResult, ConsensusError>,
257) -> Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>
258where
259	Block: BlockT,
260{
261	match import {
262		Ok(ImportResult::AlreadyInChain) => {
263			trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash);
264			Ok(BlockImportStatus::ImportedKnown(number, block_origin))
265		},
266		Ok(ImportResult::Imported(aux)) => {
267			Ok(BlockImportStatus::ImportedUnknown(number, aux, block_origin))
268		},
269		Ok(ImportResult::MissingState) => {
270			debug!(
271				target: LOG_TARGET,
272				"Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash
273			);
274			Err(BlockImportError::MissingState)
275		},
276		Ok(ImportResult::UnknownParent) => {
277			debug!(
278				target: LOG_TARGET,
279				"Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash
280			);
281			Err(BlockImportError::UnknownParent)
282		},
283		Ok(ImportResult::KnownBad) => {
284			debug!(target: LOG_TARGET, "Peer gave us a bad block {}: {:?}", number, hash);
285			Err(BlockImportError::BadBlock(block_origin))
286		},
287		Err(e) => {
288			debug!(target: LOG_TARGET, "Error importing block {}: {:?}: {}", number, hash, e);
289			Err(BlockImportError::Other(e))
290		},
291	}
292}
293
294pub(crate) enum SingleBlockVerificationOutcome<Block: BlockT> {
295	/// Block is already imported.
296	Imported(BlockImportStatus<NumberFor<Block>>),
297	/// Block is verified, but needs to be imported.
298	Verified(SingleBlockImportParameters<Block>),
299}
300
301pub(crate) struct SingleBlockImportParameters<Block: BlockT> {
302	import_block: BlockImportParams<Block>,
303	hash: Block::Hash,
304	block_origin: Option<RuntimeOrigin>,
305	verification_time: Duration,
306}
307
308/// Single block import function with metering.
309pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
310	import_handle: &impl BlockImport<B, Error = ConsensusError>,
311	block_origin: BlockOrigin,
312	block: IncomingBlock<B>,
313	verifier: &V,
314	metrics: Option<&Metrics>,
315) -> Result<SingleBlockVerificationOutcome<B>, BlockImportError> {
316	let peer = block.origin;
317	let justifications = block.justifications;
318
319	let Some(header) = block.header else {
320		if let Some(ref peer) = peer {
321			debug!(target: LOG_TARGET, "Header {} was not provided by {peer} ", block.hash);
322		} else {
323			debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash);
324		}
325		return Err(BlockImportError::IncompleteHeader(peer));
326	};
327
328	let number = *header.number();
329	let hash = block.hash;
330	let parent_hash = *header.parent_hash();
331
332	trace!(target: LOG_TARGET, "Block {number} ({hash}) has {:?} logs (origin: {:?})", header.digest().logs().len(), block_origin);
333
334	// Skip block verification for warp synced blocks.
335	// They have been verified within warp sync proof verification.
336	if matches!(block_origin, BlockOrigin::WarpSync) {
337		return Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
338			import_block: BlockImportParams::new(block_origin, header),
339			hash: block.hash,
340			block_origin: peer,
341			verification_time: Duration::ZERO,
342		}));
343	}
344
345	match import_handler::<B>(
346		number,
347		hash,
348		parent_hash,
349		peer,
350		import_handle
351			.check_block(BlockCheckParams {
352				hash,
353				number,
354				parent_hash,
355				allow_missing_state: block.allow_missing_state,
356				import_existing: block.import_existing,
357				allow_missing_parent: block.state.is_some(),
358			})
359			.await,
360	)? {
361		BlockImportStatus::ImportedUnknown { .. } => (),
362		r => {
363			// Any other successful result means that the block is already imported.
364			return Ok(SingleBlockVerificationOutcome::Imported(r));
365		},
366	}
367
368	let started = Instant::now();
369
370	let mut import_block = BlockImportParams::new(block_origin, header);
371	import_block.body = block.body;
372	import_block.justifications = justifications;
373	import_block.post_hash = Some(hash);
374	import_block.import_existing = block.import_existing;
375	import_block.indexed_body = block.indexed_body;
376
377	if let Some(state) = block.state {
378		let changes = crate::block_import::StorageChanges::Import(state);
379		import_block.state_action = StateAction::ApplyChanges(changes);
380	} else if block.skip_execution {
381		import_block.state_action = StateAction::Skip;
382	} else if block.allow_missing_state {
383		import_block.state_action = StateAction::ExecuteIfPossible;
384	}
385
386	let import_block = verifier.verify(import_block).await.map_err(|msg| {
387		if let Some(ref peer) = peer {
388			trace!(
389				target: LOG_TARGET,
390				"Verifying {}({}) from {} failed: {}",
391				number,
392				hash,
393				peer,
394				msg
395			);
396		} else {
397			trace!(target: LOG_TARGET, "Verifying {}({}) failed: {}", number, hash, msg);
398		}
399		if let Some(metrics) = metrics {
400			metrics.report_verification(false, started.elapsed());
401		}
402		BlockImportError::VerificationFailed(peer, msg)
403	})?;
404
405	let verification_time = started.elapsed();
406	if let Some(metrics) = metrics {
407		metrics.report_verification(true, verification_time);
408	}
409
410	Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
411		import_block,
412		hash,
413		block_origin: peer,
414		verification_time,
415	}))
416}
417
418pub(crate) async fn import_single_block_metered<Block: BlockT>(
419	import_handle: &mut impl BlockImport<Block, Error = ConsensusError>,
420	import_parameters: SingleBlockImportParameters<Block>,
421	metrics: Option<&Metrics>,
422) -> BlockImportResult<Block> {
423	let started = Instant::now();
424
425	let SingleBlockImportParameters { import_block, hash, block_origin, verification_time } =
426		import_parameters;
427
428	let number = *import_block.header.number();
429	let parent_hash = *import_block.header.parent_hash();
430
431	let imported = import_handle.import_block(import_block).await;
432	if let Some(metrics) = metrics {
433		metrics.report_verification_and_import(started.elapsed() + verification_time);
434	}
435
436	import_handler::<Block>(number, hash, parent_hash, block_origin, imported)
437}