1use log::{debug, trace};
19use std::{
20 fmt,
21 time::{Duration, Instant},
22};
23
24use crate::consensus::{error::Error as ConsensusError, BlockOrigin};
25use subsoil::runtime::{
26 traits::{Block as BlockT, Header as _, NumberFor},
27 Justifications,
28};
29
30use super::{
31 block_import::{
32 BlockCheckParams, BlockImport, BlockImportParams, ImportResult, ImportedAux, ImportedState,
33 JustificationImport, StateAction,
34 },
35 metrics::Metrics,
36};
37
38pub use basic_queue::BasicQueue;
39
40const LOG_TARGET: &str = "sync::import-queue";
41
42pub type DefaultImportQueue<Block> = BasicQueue<Block>;
46
47mod basic_queue;
48pub mod buffered_link;
49pub mod mock;
50
51pub type BoxBlockImport<B> = Box<dyn BlockImport<B, Error = ConsensusError> + Send + Sync>;
53
54pub type BoxJustificationImport<B> =
56 Box<dyn JustificationImport<B, Error = ConsensusError> + Send + Sync>;
57
58#[derive(Clone, Debug, PartialEq, Eq, Hash)]
60pub struct RuntimeOrigin(Vec<u8>);
61
62impl RuntimeOrigin {
63 pub fn from_bytes(bytes: Vec<u8>) -> Self {
65 Self(bytes)
66 }
67
68 pub fn as_bytes(&self) -> &[u8] {
70 &self.0
71 }
72
73 pub fn into_bytes(self) -> Vec<u8> {
75 self.0
76 }
77}
78
79impl From<Vec<u8>> for RuntimeOrigin {
80 fn from(bytes: Vec<u8>) -> Self {
81 Self::from_bytes(bytes)
82 }
83}
84
85#[derive(Debug, PartialEq, Eq, Clone)]
87pub struct IncomingBlock<B: BlockT> {
88 pub hash: <B as BlockT>::Hash,
90 pub header: Option<<B as BlockT>::Header>,
92 pub body: Option<Vec<<B as BlockT>::Extrinsic>>,
94 pub indexed_body: Option<Vec<Vec<u8>>>,
96 pub justifications: Option<Justifications>,
98 pub origin: Option<RuntimeOrigin>,
100 pub allow_missing_state: bool,
102 pub skip_execution: bool,
104 pub import_existing: bool,
106 pub state: Option<ImportedState<B>>,
108}
109
110#[async_trait::async_trait]
112pub trait Verifier<B: BlockT>: Send + Sync {
113 async fn verify(&self, block: BlockImportParams<B>) -> Result<BlockImportParams<B>, String>;
116}
117
118pub trait ImportQueueService<B: BlockT>: Send {
122 fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);
125
126 fn import_justifications(
128 &mut self,
129 who: RuntimeOrigin,
130 hash: B::Hash,
131 number: NumberFor<B>,
132 justifications: Justifications,
133 );
134}
135
136#[async_trait::async_trait]
137pub trait ImportQueue<B: BlockT>: Send {
138 fn service(&self) -> Box<dyn ImportQueueService<B>>;
140
141 fn service_ref(&mut self) -> &mut dyn ImportQueueService<B>;
143
144 fn poll_actions(&mut self, cx: &mut futures::task::Context, link: &dyn Link<B>);
148
149 async fn run(self, link: &dyn Link<B>);
154}
155
156#[derive(Debug, PartialEq)]
158pub enum JustificationImportResult {
159 Success,
161
162 Failure,
164
165 OutdatedJustification,
167}
168
169pub trait Link<B: BlockT>: Send + Sync {
172 fn blocks_processed(
174 &self,
175 _imported: usize,
176 _count: usize,
177 _results: Vec<(BlockImportResult<B>, B::Hash)>,
178 ) {
179 }
180
181 fn justification_imported(
183 &self,
184 _who: RuntimeOrigin,
185 _hash: &B::Hash,
186 _number: NumberFor<B>,
187 _import_result: JustificationImportResult,
188 ) {
189 }
190
191 fn request_justification(&self, _hash: &B::Hash, _number: NumberFor<B>) {}
193}
194
195#[derive(Debug, PartialEq)]
197pub enum BlockImportStatus<BlockNumber: fmt::Debug + PartialEq> {
198 ImportedKnown(BlockNumber, Option<RuntimeOrigin>),
200 ImportedUnknown(BlockNumber, ImportedAux, Option<RuntimeOrigin>),
202}
203
204impl<BlockNumber: fmt::Debug + PartialEq> BlockImportStatus<BlockNumber> {
205 pub fn number(&self) -> &BlockNumber {
207 match self {
208 BlockImportStatus::ImportedKnown(n, _)
209 | BlockImportStatus::ImportedUnknown(n, _, _) => n,
210 }
211 }
212}
213
214#[derive(Debug, thiserror::Error)]
216pub enum BlockImportError {
217 #[error("block is missing a header (origin = {0:?})")]
219 IncompleteHeader(Option<RuntimeOrigin>),
220
221 #[error("block verification failed (origin = {0:?}): {1}")]
223 VerificationFailed(Option<RuntimeOrigin>, String),
224
225 #[error("bad block (origin = {0:?})")]
227 BadBlock(Option<RuntimeOrigin>),
228
229 #[error("block is missing parent state")]
231 MissingState,
232
233 #[error("block has an unknown parent")]
235 UnknownParent,
236
237 #[error("import has been cancelled")]
239 Cancelled,
240
241 #[error("consensus error: {0}")]
243 Other(ConsensusError),
244}
245
246type BlockImportResult<B> = Result<BlockImportStatus<NumberFor<B>>, BlockImportError>;
247
248pub async fn import_single_block<B: BlockT, V: Verifier<B>>(
250 import_handle: &mut impl BlockImport<B, Error = ConsensusError>,
251 block_origin: BlockOrigin,
252 block: IncomingBlock<B>,
253 verifier: &V,
254) -> BlockImportResult<B> {
255 match verify_single_block_metered(import_handle, block_origin, block, verifier, None).await? {
256 SingleBlockVerificationOutcome::Imported(import_status) => Ok(import_status),
257 SingleBlockVerificationOutcome::Verified(import_parameters) => {
258 import_single_block_metered(import_handle, import_parameters, None).await
259 },
260 }
261}
262
263fn import_handler<Block>(
264 number: NumberFor<Block>,
265 hash: Block::Hash,
266 parent_hash: Block::Hash,
267 block_origin: Option<RuntimeOrigin>,
268 import: Result<ImportResult, ConsensusError>,
269) -> Result<BlockImportStatus<NumberFor<Block>>, BlockImportError>
270where
271 Block: BlockT,
272{
273 match import {
274 Ok(ImportResult::AlreadyInChain) => {
275 trace!(target: LOG_TARGET, "Block already in chain {}: {:?}", number, hash);
276 Ok(BlockImportStatus::ImportedKnown(number, block_origin))
277 },
278 Ok(ImportResult::Imported(aux)) => {
279 Ok(BlockImportStatus::ImportedUnknown(number, aux, block_origin))
280 },
281 Ok(ImportResult::MissingState) => {
282 debug!(
283 target: LOG_TARGET,
284 "Parent state is missing for {}: {:?}, parent: {:?}", number, hash, parent_hash
285 );
286 Err(BlockImportError::MissingState)
287 },
288 Ok(ImportResult::UnknownParent) => {
289 debug!(
290 target: LOG_TARGET,
291 "Block with unknown parent {}: {:?}, parent: {:?}", number, hash, parent_hash
292 );
293 Err(BlockImportError::UnknownParent)
294 },
295 Ok(ImportResult::KnownBad) => {
296 debug!(target: LOG_TARGET, "Peer gave us a bad block {}: {:?}", number, hash);
297 Err(BlockImportError::BadBlock(block_origin))
298 },
299 Err(e) => {
300 debug!(target: LOG_TARGET, "Error importing block {}: {:?}: {}", number, hash, e);
301 Err(BlockImportError::Other(e))
302 },
303 }
304}
305
306pub(crate) enum SingleBlockVerificationOutcome<Block: BlockT> {
307 Imported(BlockImportStatus<NumberFor<Block>>),
309 Verified(SingleBlockImportParameters<Block>),
311}
312
313pub(crate) struct SingleBlockImportParameters<Block: BlockT> {
314 import_block: BlockImportParams<Block>,
315 hash: Block::Hash,
316 block_origin: Option<RuntimeOrigin>,
317 verification_time: Duration,
318}
319
320pub(crate) async fn verify_single_block_metered<B: BlockT, V: Verifier<B>>(
322 import_handle: &impl BlockImport<B, Error = ConsensusError>,
323 block_origin: BlockOrigin,
324 block: IncomingBlock<B>,
325 verifier: &V,
326 metrics: Option<&Metrics>,
327) -> Result<SingleBlockVerificationOutcome<B>, BlockImportError> {
328 let peer = block.origin;
329 let justifications = block.justifications;
330
331 let Some(header) = block.header else {
332 if let Some(ref peer) = peer {
333 debug!(target: LOG_TARGET, "Header {} was not provided by {:?} ", block.hash, peer);
334 } else {
335 debug!(target: LOG_TARGET, "Header {} was not provided ", block.hash);
336 }
337 return Err(BlockImportError::IncompleteHeader(peer));
338 };
339
340 let number = *header.number();
341 let hash = block.hash;
342 let parent_hash = *header.parent_hash();
343
344 trace!(target: LOG_TARGET, "Block {number} ({hash}) has {:?} logs (origin: {:?})", header.digest().logs().len(), block_origin);
345
346 if matches!(block_origin, BlockOrigin::WarpSync) {
349 return Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
350 import_block: BlockImportParams::new(block_origin, header),
351 hash: block.hash,
352 block_origin: peer,
353 verification_time: Duration::ZERO,
354 }));
355 }
356
357 match import_handler::<B>(
358 number,
359 hash,
360 parent_hash,
361 peer.clone(),
362 import_handle
363 .check_block(BlockCheckParams {
364 hash,
365 number,
366 parent_hash,
367 allow_missing_state: block.allow_missing_state,
368 import_existing: block.import_existing,
369 allow_missing_parent: block.state.is_some(),
370 })
371 .await,
372 )? {
373 BlockImportStatus::ImportedUnknown { .. } => (),
374 r => {
375 return Ok(SingleBlockVerificationOutcome::Imported(r));
377 },
378 }
379
380 let started = Instant::now();
381
382 let mut import_block = BlockImportParams::new(block_origin, header);
383 import_block.body = block.body;
384 import_block.justifications = justifications;
385 import_block.post_hash = Some(hash);
386 import_block.import_existing = block.import_existing;
387 import_block.indexed_body = block.indexed_body;
388
389 if let Some(state) = block.state {
390 let changes = super::block_import::StorageChanges::Import(state);
391 import_block.state_action = StateAction::ApplyChanges(changes);
392 } else if block.skip_execution {
393 import_block.state_action = StateAction::Skip;
394 } else if block.allow_missing_state {
395 import_block.state_action = StateAction::ExecuteIfPossible;
396 }
397
398 let import_block = verifier.verify(import_block).await.map_err(|msg| {
399 if let Some(ref peer) = peer {
400 trace!(
401 target: LOG_TARGET,
402 "Verifying {}({}) from {:?} failed: {}",
403 number,
404 hash,
405 peer,
406 msg
407 );
408 } else {
409 trace!(target: LOG_TARGET, "Verifying {}({}) failed: {}", number, hash, msg);
410 }
411 if let Some(metrics) = metrics {
412 metrics.report_verification(false, started.elapsed());
413 }
414 BlockImportError::VerificationFailed(peer.clone(), msg)
415 })?;
416
417 let verification_time = started.elapsed();
418 if let Some(metrics) = metrics {
419 metrics.report_verification(true, verification_time);
420 }
421
422 Ok(SingleBlockVerificationOutcome::Verified(SingleBlockImportParameters {
423 import_block,
424 hash,
425 block_origin: peer,
426 verification_time,
427 }))
428}
429
430pub(crate) async fn import_single_block_metered<Block: BlockT>(
431 import_handle: &mut impl BlockImport<Block, Error = ConsensusError>,
432 import_parameters: SingleBlockImportParameters<Block>,
433 metrics: Option<&Metrics>,
434) -> BlockImportResult<Block> {
435 let started = Instant::now();
436
437 let SingleBlockImportParameters { import_block, hash, block_origin, verification_time } =
438 import_parameters;
439
440 let number = *import_block.header.number();
441 let parent_hash = *import_block.header.parent_hash();
442
443 let imported = import_handle.import_block(import_block).await;
444 if let Some(metrics) = metrics {
445 metrics.report_verification_and_import(started.elapsed() + verification_time);
446 }
447
448 import_handler::<Block>(number, hash, parent_hash, block_origin, imported)
449}