1use log::{debug, trace};
31use std::{
32 fmt,
33 time::{Duration, Instant},
34};
35
36use sp_consensus::{error::Error as ConsensusError, BlockOrigin};
37use sp_runtime::{
38 traits::{Block as BlockT, Header as _, NumberFor},
39 Justifications,
40};
41
42use crate::{
43 block_import::{
44 BlockCheckParams, BlockImport, BlockImportParams, ImportResult, ImportedAux, ImportedState,
45 JustificationImport, StateAction,
46 },
47 metrics::Metrics,
48};
49
50pub use basic_queue::BasicQueue;
51
52const LOG_TARGET: &str = "sync::import-queue";
53
54pub type DefaultImportQueue<Block> = BasicQueue<Block>;
58
59mod basic_queue;
60pub mod buffered_link;
61pub mod mock;
62
63pub type BoxBlockImport<B> = Box<dyn BlockImport<B, Error = ConsensusError> + Send + Sync>;
65
66pub type BoxJustificationImport<B> =
68 Box<dyn JustificationImport<B, Error = ConsensusError> + Send + Sync>;
69
70pub type RuntimeOrigin = sc_network_types::PeerId;
72
73#[derive(Debug, PartialEq, Eq, Clone)]
75pub struct IncomingBlock<B: BlockT> {
76 pub hash: <B as BlockT>::Hash,
78 pub header: Option<<B as BlockT>::Header>,
80 pub body: Option<Vec<<B as BlockT>::Extrinsic>>,
82 pub indexed_body: Option<Vec<Vec<u8>>>,
84 pub justifications: Option<Justifications>,
86 pub origin: Option<RuntimeOrigin>,
88 pub allow_missing_state: bool,
90 pub skip_execution: bool,
92 pub import_existing: bool,
94 pub state: Option<ImportedState<B>>,
96}
97
98#[async_trait::async_trait]
100pub trait Verifier<B: BlockT>: Send + Sync {
101 async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String>;
104}
105
106pub trait ImportQueueService<B: BlockT>: Send {
110 fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
113
114 fn import_justifications(
116 &mut self,
117 who: RuntimeOrigin,
118 hash: B::Hash,
119 number: NumberFor<B>,
120 justifications: Justifications,
121 );
122}
123
124#[async_trait::async_trait]
125pub trait ImportQueue<B: BlockT>: Send {
126 fn service(&self) -> Box<dyn ImportQueueService<B>>;
128
129 fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;
131
132 fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &dyn Link<B>);
136
137 async fn run(self, link: &dyn Link<B>);
142}
143
144#[derive(Debug, PartialEq)]
146pub enum JustificationImportResult {
147 Success,
149
150 Failure,
152
153 OutdatedJustification,
155}
156
157pub trait Link<B: BlockT>: Send + Sync {
160 fn blocks_processed(
162 &self,
163 _imported: usize,
164 _count: usize,
165 _results: Vec<(BlockImportResult<B>, B::Hash)>,
166 ) {
167 }
168
169 fn justification_imported(
171 &self,
172 _who: RuntimeOrigin,
173 _hash: &B::Hash,
174 _number: NumberFor<B>,
175 _import_result: JustificationImportResult,
176 ) {
177 }
178
179 fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) {}
181}
182
183#[derive(Debug, PartialEq)]
185pub enum BlockImportStatus<BlockNumber: fmt::Debug + PartialEq> {
186 ImportedKnown(BlockNumber, Option<RuntimeOrigin>),
188 ImportedUnknown(BlockNumber, ImportedAux, Option<RuntimeOrigin>),
190}
191
192impl<BlockNumber: fmt::Debug + PartialEq> BlockImportStatus<BlockNumber> {
193 pub fn number(&self) -> &BlockNumber {
195 match self {
196 BlockImportStatus::ImportedKnown(n, _) |
197 BlockImportStatus::ImportedUnknown(n, _, _) => n,
198 }
199 }
200}
201
202#[derive(Debug, thiserror::Error)]
204pub enum BlockImportError {
205 #[error("block is missing a header (origin = {0:?})")]
207 IncompleteHeader(Option<RuntimeOrigin>),
208
209 #[error("block verification failed (origin = {0:?}): {1}")]
211 VerificationFailed(Option<RuntimeOrigin>, String),
212
213 #[error("bad block (origin = {0:?})")]
215 BadBlock(Option<RuntimeOrigin>),
216
217 #[error("block is missing parent state")]
219 MissingState,
220
221 #[error("block has an unknown parent")]
223 UnknownParent,
224
225 #[error("import has been cancelled")]
227 Cancelled,
228
229 #[error("consensus error: {0}")]
231 Other(ConsensusError),
232}
233
234type BlockImportResult<B> = Result<BlockImportStatus<NumberFor<B>>, BlockImportError>;
235
236pub async fn import_single_block<B: BlockT, V: Verifier<B>>(
238 import_handle: &mut impl BlockImport<B, Error = ConsensusError>,
239 block_origin: BlockOrigin,
240 block: IncomingBlock<B>,
241 verifier: &V,
242) -> BlockImportResult<B> {
243 match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? {
244 SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status),
245 SingleBlockVerificationOutcome::Verified(import_parameters) => {
246 import_single_block_metered(import_handle, import_parameters, None).await
247 },
248 }
249}
250
251fn import_handler<Block>(
252 number: NumberFor<Block>,
253 hash: Block::Hash,
254 parent_hash: Block::Hash,
255 block_origin: Option<RuntimeOrigin>,
256 import: Result<ImportResult, ConsensusError>,
257) -> Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>
258where
259 Block: BlockT,
260{
261 match import {
262 Ok(ImportResult::AlreadyInChain) => {
263 trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash);
264 Ok(BlockImportStatus::ImportedKnown(number, block_origin))
265 },
266 Ok(ImportResult::Imported(aux)) => {
267 Ok(BlockImportStatus::ImportedUnknown(number, aux, block_origin))
268 },
269 Ok(ImportResult::MissingState) => {
270 debug!(
271 target: LOG_TARGET,
272 "Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash
273 );
274 Err(BlockImportError::MissingState)
275 },
276 Ok(ImportResult::UnknownParent) => {
277 debug!(
278 target: LOG_TARGET,
279 "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash
280 );
281 Err(BlockImportError::UnknownParent)
282 },
283 Ok(ImportResult::KnownBad) => {
284 debug!(target: LOG_TARGET, "Peer gave us a bad block {}: {:?}", number, hash);
285 Err(BlockImportError::BadBlock(block_origin))
286 },
287 Err(e) => {
288 debug!(target: LOG_TARGET, "Error importing block {}: {:?}: {}", number, hash, e);
289 Err(BlockImportError::Other(e))
290 },
291 }
292}
293
294pub(crate) enum SingleBlockVerificationOutcome<Block: BlockT> {
295 Imported(BlockImportStatus<NumberFor<Block>>),
297 Verified(SingleBlockImportParameters<Block>),
299}
300
301pub(crate) struct SingleBlockImportParameters<Block: BlockT> {
302 import_block: BlockImportParams<Block>,
303 hash: Block::Hash,
304 block_origin: Option<RuntimeOrigin>,
305 verification_time: Duration,
306}
307
308pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
310 import_handle: &impl BlockImport<B, Error = ConsensusError>,
311 block_origin: BlockOrigin,
312 block: IncomingBlock<B>,
313 verifier: &V,
314 metrics: Option<&Metrics>,
315) -> Result<SingleBlockVerificationOutcome<B>, BlockImportError> {
316 let peer = block.origin;
317 let justifications = block.justifications;
318
319 let Some(header) = block.header else {
320 if let Some(ref peer) = peer {
321 debug!(target: LOG_TARGET, "Header {} was not provided by {peer} ", block.hash);
322 } else {
323 debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash);
324 }
325 return Err(BlockImportError::IncompleteHeader(peer));
326 };
327
328 let number = *header.number();
329 let hash = block.hash;
330 let parent_hash = *header.parent_hash();
331
332 trace!(target: LOG_TARGET, "Block {number} ({hash}) has {:?} logs (origin: {:?})", header.digest().logs().len(), block_origin);
333
334 if matches!(block_origin, BlockOrigin::WarpSync) {
337 return Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
338 import_block: BlockImportParams::new(block_origin, header),
339 hash: block.hash,
340 block_origin: peer,
341 verification_time: Duration::ZERO,
342 }));
343 }
344
345 match import_handler::<B>(
346 number,
347 hash,
348 parent_hash,
349 peer,
350 import_handle
351 .check_block(BlockCheckParams {
352 hash,
353 number,
354 parent_hash,
355 allow_missing_state: block.allow_missing_state,
356 import_existing: block.import_existing,
357 allow_missing_parent: block.state.is_some(),
358 })
359 .await,
360 )? {
361 BlockImportStatus::ImportedUnknown { .. } => (),
362 r => {
363 return Ok(SingleBlockVerificationOutcome::Imported(r));
365 },
366 }
367
368 let started = Instant::now();
369
370 let mut import_block = BlockImportParams::new(block_origin, header);
371 import_block.body = block.body;
372 import_block.justifications = justifications;
373 import_block.post_hash = Some(hash);
374 import_block.import_existing = block.import_existing;
375 import_block.indexed_body = block.indexed_body;
376
377 if let Some(state) = block.state {
378 let changes = crate::block_import::StorageChanges::Import(state);
379 import_block.state_action = StateAction::ApplyChanges(changes);
380 } else if block.skip_execution {
381 import_block.state_action = StateAction::Skip;
382 } else if block.allow_missing_state {
383 import_block.state_action = StateAction::ExecuteIfPossible;
384 }
385
386 let import_block = verifier.verify(import_block).await.map_err(|msg| {
387 if let Some(ref peer) = peer {
388 trace!(
389 target: LOG_TARGET,
390 "Verifying {}({}) from {} failed: {}",
391 number,
392 hash,
393 peer,
394 msg
395 );
396 } else {
397 trace!(target: LOG_TARGET, "Verifying {}({}) failed: {}", number, hash, msg);
398 }
399 if let Some(metrics) = metrics {
400 metrics.report_verification(false, started.elapsed());
401 }
402 BlockImportError::VerificationFailed(peer, msg)
403 })?;
404
405 let verification_time = started.elapsed();
406 if let Some(metrics) = metrics {
407 metrics.report_verification(true, verification_time);
408 }
409
410 Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
411 import_block,
412 hash,
413 block_origin: peer,
414 verification_time,
415 }))
416}
417
418pub(crate) async fn import_single_block_metered<Block: BlockT>(
419 import_handle: &mut impl BlockImport<Block, Error = ConsensusError>,
420 import_parameters: SingleBlockImportParameters<Block>,
421 metrics: Option<&Metrics>,
422) -> BlockImportResult<Block> {
423 let started = Instant::now();
424
425 let SingleBlockImportParameters { import_block, hash, block_origin, verification_time } =
426 import_parameters;
427
428 let number = *import_block.header.number();
429 let parent_hash = *import_block.header.parent_hash();
430
431 let imported = import_handle.import_block(import_block).await;
432 if let Some(metrics) = metrics {
433 metrics.report_verification_and_import(started.elapsed() + verification_time);
434 }
435
436 import_handler::<Block>(number, hash, parent_hash, block_origin, imported)
437}