Skip to main content

soil_client/import/queue/
mod.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Import Queue primitive: something which can verify and import blocks.
8//!
9//! This serves as an intermediate and abstracted step between synchronization
10//! and import. Each mode of consensus will have its own requirements for block
11//! verification. Some algorithms can verify in parallel, while others only
12//! sequentially.
13//!
14//! The `ImportQueue` trait allows such verification strategies to be
15//! instantiated. The `BasicQueue` and `BasicVerifier` traits allow serial
16//! queues to be instantiated simply.
17
18use log::{debug, trace};
19use std::{
20	fmt,
21	time::{Duration, Instant},
22};
23
24use crate::consensus::{error::Error as ConsensusError, BlockOrigin};
25use subsoil::runtime::{
26	traits::{Block as BlockT, Header as _, NumberFor},
27	Justifications,
28};
29
30use super::{
31	block_import::{
32		BlockCheckParams, BlockImport, BlockImportParams, ImportResult, ImportedAux, ImportedState,
33		JustificationImport, StateAction,
34	},
35	metrics::Metrics,
36};
37
38pub use basic_queue::BasicQueue;
39
40const LOG_TARGET: &str = "sync::import-queue";
41
42/// A commonly-used Import Queue type.
43///
44/// This defines the transaction type of the `BasicQueue` to be the transaction type for a client.
45pub type DefaultImportQueue<Block> = BasicQueue<Block>;
46
47mod basic_queue;
48pub mod buffered_link;
49pub mod mock;
50
51/// Shared block import struct used by the queue.
52pub type BoxBlockImport<B> = Box<dyn BlockImport<B, Error = ConsensusError> + Send + Sync>;
53
54/// Shared justification import struct used by the queue.
55pub type BoxJustificationImport<B> =
56	Box<dyn JustificationImport<B, Error = ConsensusError> + Send + Sync>;
57
58/// Opaque identifier of the source that supplied network-imported data.
59#[derive(Clone, Debug, PartialEq, Eq, Hash)]
60pub struct RuntimeOrigin(Vec<u8>);
61
62impl RuntimeOrigin {
63	/// Create a runtime origin from its byte representation.
64	pub fn from_bytes(bytes: Vec<u8>) -> Self {
65		Self(bytes)
66	}
67
68	/// Return the raw byte representation.
69	pub fn as_bytes(&self) -> &[u8] {
70		&self.0
71	}
72
73	/// Consume the origin and return its raw bytes.
74	pub fn into_bytes(self) -> Vec<u8> {
75		self.0
76	}
77}
78
79impl From<Vec<u8>> for RuntimeOrigin {
80	fn from(bytes: Vec<u8>) -> Self {
81		Self::from_bytes(bytes)
82	}
83}
84
85/// Block data used by the queue.
86#[derive(Debug, PartialEq, Eq, Clone)]
87pub struct IncomingBlock<B: BlockT> {
88	/// Block header hash.
89	pub hash: <B as BlockT>::Hash,
90	/// Block header if requested.
91	pub header: Option<<B as BlockT>::Header>,
92	/// Block body if requested.
93	pub body: Option<Vec<<B as BlockT>::Extrinsic>>,
94	/// Indexed block body if requested.
95	pub indexed_body: Option<Vec<Vec<u8>>>,
96	/// Justification(s) if requested.
97	pub justifications: Option<Justifications>,
98	/// The peer, we received this from
99	pub origin: Option<RuntimeOrigin>,
100	/// Allow importing the block skipping state verification if parent state is missing.
101	pub allow_missing_state: bool,
102	/// Skip block execution and state verification.
103	pub skip_execution: bool,
104	/// Re-validate existing block.
105	pub import_existing: bool,
106	/// Do not compute new state, but rather set it to the given set.
107	pub state: Option<ImportedState<B>>,
108}
109
110/// Verify a justification of a block
111#[async_trait::async_trait]
112pub trait Verifier<B: BlockT>: Send + Sync {
113	/// Verify the given block data and return the `BlockImportParams` to
114	/// continue the block import process.
115	async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String>;
116}
117
118/// Blocks import queue API.
119///
120/// The `import_*` methods can be called in order to send elements for the import queue to verify.
121pub trait ImportQueueService<B: BlockT>: Send {
122	/// Import a bunch of blocks, every next block must be an ancestor of the previous block in the
123	/// list.
124	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
125
126	/// Import block justifications.
127	fn import_justifications(
128		&mut self,
129		who: RuntimeOrigin,
130		hash: B::Hash,
131		number: NumberFor<B>,
132		justifications: Justifications,
133	);
134}
135
136#[async_trait::async_trait]
137pub trait ImportQueue<B: BlockT>: Send {
138	/// Get a copy of the handle to [`ImportQueueService`].
139	fn service(&self) -> Box<dyn ImportQueueService<B>>;
140
141	/// Get a reference to the handle to [`ImportQueueService`].
142	fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;
143
144	/// This method should behave in a way similar to `Future::poll`. It can register the current
145	/// task and notify later when more actions are ready to be polled. To continue the comparison,
146	/// it is as if this method always returned `Poll::Pending`.
147	fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &dyn Link<B>);
148
149	/// Start asynchronous runner for import queue.
150	///
151	/// Takes an object implementing [`Link`] which allows the import queue to
152	/// influence the synchronization process.
153	async fn run(self, link: &dyn Link<B>);
154}
155
156/// The result of importing a justification.
157#[derive(Debug, PartialEq)]
158pub enum JustificationImportResult {
159	/// Justification was imported successfully.
160	Success,
161
162	/// Justification was not imported successfully.
163	Failure,
164
165	/// Justification was not imported successfully, because it is outdated.
166	OutdatedJustification,
167}
168
169/// Hooks that the verification queue can use to influence the synchronization
170/// algorithm.
171pub trait Link<B: BlockT>: Send + Sync {
172	/// Batch of blocks imported, with or without error.
173	fn blocks_processed(
174		&self,
175		_imported: usize,
176		_count: usize,
177		_results: Vec<(BlockImportResult<B>, B::Hash)>,
178	) {
179	}
180
181	/// Justification import result.
182	fn justification_imported(
183		&self,
184		_who: RuntimeOrigin,
185		_hash: &B::Hash,
186		_number: NumberFor<B>,
187		_import_result: JustificationImportResult,
188	) {
189	}
190
191	/// Request a justification for the given block.
192	fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) {}
193}
194
195/// Block import successful result.
196#[derive(Debug, PartialEq)]
197pub enum BlockImportStatus<BlockNumber: fmt::Debug + PartialEq> {
198	/// Imported known block.
199	ImportedKnown(BlockNumber, Option<RuntimeOrigin>),
200	/// Imported unknown block.
201	ImportedUnknown(BlockNumber, ImportedAux, Option<RuntimeOrigin>),
202}
203
204impl<BlockNumber: fmt::Debug + PartialEq> BlockImportStatus<BlockNumber> {
205	/// Returns the imported block number.
206	pub fn number(&self) -> &BlockNumber {
207		match self {
208			BlockImportStatus::ImportedKnown(n, _)
209			| BlockImportStatus::ImportedUnknown(n, _, _) => n,
210		}
211	}
212}
213
214/// Block import error.
215#[derive(Debug, thiserror::Error)]
216pub enum BlockImportError {
217	/// Block missed header, can't be imported
218	#[error("block is missing a header (origin = {0:?})")]
219	IncompleteHeader(Option<RuntimeOrigin>),
220
221	/// Block verification failed, can't be imported
222	#[error("block verification failed (origin = {0:?}): {1}")]
223	VerificationFailed(Option<RuntimeOrigin>, String),
224
225	/// Block is known to be Bad
226	#[error("bad block (origin = {0:?})")]
227	BadBlock(Option<RuntimeOrigin>),
228
229	/// Parent state is missing.
230	#[error("block is missing parent state")]
231	MissingState,
232
233	/// Block has an unknown parent
234	#[error("block has an unknown parent")]
235	UnknownParent,
236
237	/// Block import has been cancelled. This can happen if the parent block fails to be imported.
238	#[error("import has been cancelled")]
239	Cancelled,
240
241	/// Other error.
242	#[error("consensus error: {0}")]
243	Other(ConsensusError),
244}
245
246type BlockImportResult<B> = Result<BlockImportStatus<NumberFor<B>>, BlockImportError>;
247
248/// Single block import function.
249pub async fn import_single_block<B: BlockT, V: Verifier<B>>(
250	import_handle: &mut impl BlockImport<B, Error = ConsensusError>,
251	block_origin: BlockOrigin,
252	block: IncomingBlock<B>,
253	verifier: &V,
254) -> BlockImportResult<B> {
255	match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? {
256		SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status),
257		SingleBlockVerificationOutcome::Verified(import_parameters) => {
258			import_single_block_metered(import_handle, import_parameters, None).await
259		},
260	}
261}
262
263fn import_handler<Block>(
264	number: NumberFor<Block>,
265	hash: Block::Hash,
266	parent_hash: Block::Hash,
267	block_origin: Option<RuntimeOrigin>,
268	import: Result<ImportResult, ConsensusError>,
269) -> Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>
270where
271	Block: BlockT,
272{
273	match import {
274		Ok(ImportResult::AlreadyInChain) => {
275			trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash);
276			Ok(BlockImportStatus::ImportedKnown(number, block_origin))
277		},
278		Ok(ImportResult::Imported(aux)) => {
279			Ok(BlockImportStatus::ImportedUnknown(number, aux, block_origin))
280		},
281		Ok(ImportResult::MissingState) => {
282			debug!(
283				target: LOG_TARGET,
284				"Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash
285			);
286			Err(BlockImportError::MissingState)
287		},
288		Ok(ImportResult::UnknownParent) => {
289			debug!(
290				target: LOG_TARGET,
291				"Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash
292			);
293			Err(BlockImportError::UnknownParent)
294		},
295		Ok(ImportResult::KnownBad) => {
296			debug!(target: LOG_TARGET, "Peer gave us a bad block {}: {:?}", number, hash);
297			Err(BlockImportError::BadBlock(block_origin))
298		},
299		Err(e) => {
300			debug!(target: LOG_TARGET, "Error importing block {}: {:?}: {}", number, hash, e);
301			Err(BlockImportError::Other(e))
302		},
303	}
304}
305
306pub(crate) enum SingleBlockVerificationOutcome<Block: BlockT> {
307	/// Block is already imported.
308	Imported(BlockImportStatus<NumberFor<Block>>),
309	/// Block is verified, but needs to be imported.
310	Verified(SingleBlockImportParameters<Block>),
311}
312
313pub(crate) struct SingleBlockImportParameters<Block: BlockT> {
314	import_block: BlockImportParams<Block>,
315	hash: Block::Hash,
316	block_origin: Option<RuntimeOrigin>,
317	verification_time: Duration,
318}
319
320/// Single block import function with metering.
321pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
322	import_handle: &impl BlockImport<B, Error = ConsensusError>,
323	block_origin: BlockOrigin,
324	block: IncomingBlock<B>,
325	verifier: &V,
326	metrics: Option<&Metrics>,
327) -> Result<SingleBlockVerificationOutcome<B>, BlockImportError> {
328	let peer = block.origin;
329	let justifications = block.justifications;
330
331	let Some(header) = block.header else {
332		if let Some(ref peer) = peer {
333			debug!(target: LOG_TARGET, "Header {} was not provided by {:?} ", block.hash, peer);
334		} else {
335			debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash);
336		}
337		return Err(BlockImportError::IncompleteHeader(peer));
338	};
339
340	let number = *header.number();
341	let hash = block.hash;
342	let parent_hash = *header.parent_hash();
343
344	trace!(target: LOG_TARGET, "Block {number} ({hash}) has {:?} logs (origin: {:?})", header.digest().logs().len(), block_origin);
345
346	// Skip block verification for warp synced blocks.
347	// They have been verified within warp sync proof verification.
348	if matches!(block_origin, BlockOrigin::WarpSync) {
349		return Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
350			import_block: BlockImportParams::new(block_origin, header),
351			hash: block.hash,
352			block_origin: peer,
353			verification_time: Duration::ZERO,
354		}));
355	}
356
357	match import_handler::<B>(
358		number,
359		hash,
360		parent_hash,
361		peer.clone(),
362		import_handle
363			.check_block(BlockCheckParams {
364				hash,
365				number,
366				parent_hash,
367				allow_missing_state: block.allow_missing_state,
368				import_existing: block.import_existing,
369				allow_missing_parent: block.state.is_some(),
370			})
371			.await,
372	)? {
373		BlockImportStatus::ImportedUnknown { .. } => (),
374		r => {
375			// Any other successful result means that the block is already imported.
376			return Ok(SingleBlockVerificationOutcome::Imported(r));
377		},
378	}
379
380	let started = Instant::now();
381
382	let mut import_block = BlockImportParams::new(block_origin, header);
383	import_block.body = block.body;
384	import_block.justifications = justifications;
385	import_block.post_hash = Some(hash);
386	import_block.import_existing = block.import_existing;
387	import_block.indexed_body = block.indexed_body;
388
389	if let Some(state) = block.state {
390		let changes = super::block_import::StorageChanges::Import(state);
391		import_block.state_action = StateAction::ApplyChanges(changes);
392	} else if block.skip_execution {
393		import_block.state_action = StateAction::Skip;
394	} else if block.allow_missing_state {
395		import_block.state_action = StateAction::ExecuteIfPossible;
396	}
397
398	let import_block = verifier.verify(import_block).await.map_err(|msg| {
399		if let Some(ref peer) = peer {
400			trace!(
401				target: LOG_TARGET,
402				"Verifying {}({}) from {:?} failed: {}",
403				number,
404				hash,
405				peer,
406				msg
407			);
408		} else {
409			trace!(target: LOG_TARGET, "Verifying {}({}) failed: {}", number, hash, msg);
410		}
411		if let Some(metrics) = metrics {
412			metrics.report_verification(false, started.elapsed());
413		}
414		BlockImportError::VerificationFailed(peer.clone(), msg)
415	})?;
416
417	let verification_time = started.elapsed();
418	if let Some(metrics) = metrics {
419		metrics.report_verification(true, verification_time);
420	}
421
422	Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
423		import_block,
424		hash,
425		block_origin: peer,
426		verification_time,
427	}))
428}
429
430pub(crate) async fn import_single_block_metered<Block: BlockT>(
431	import_handle: &mut impl BlockImport<Block, Error = ConsensusError>,
432	import_parameters: SingleBlockImportParameters<Block>,
433	metrics: Option<&Metrics>,
434) -> BlockImportResult<Block> {
435	let started = Instant::now();
436
437	let SingleBlockImportParameters { import_block, hash, block_origin, verification_time } =
438		import_parameters;
439
440	let number = *import_block.header.number();
441	let parent_hash = *import_block.header.parent_hash();
442
443	let imported = import_handle.import_block(import_block).await;
444	if let Some(metrics) = metrics {
445		metrics.report_verification_and_import(started.elapsed() + verification_time);
446	}
447
448	import_handler::<Block>(number, hash, parent_hash, block_origin, imported)
449}