1use futures::{
19 prelude::*,
20 task::{Context, Poll},
21};
22use log::{debug, trace};
23use prometheus_endpoint::Registry;
24use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
25use sp_consensus::BlockOrigin;
26use sp_runtime::{
27 traits::{Block as BlockT, Header as HeaderT, NumberFor},
28 Justification, Justifications,
29};
30use std::pin::Pin;
31
32use crate::{
33 import_queue::{
34 buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender},
35 import_single_block_metered, verify_single_block_metered, BlockImportError,
36 BlockImportStatus, BoxBlockImport, BoxJustificationImport, ImportQueue, ImportQueueService,
37 IncomingBlock, JustificationImportResult, Link, RuntimeOrigin,
38 SingleBlockVerificationOutcome, Verifier, LOG_TARGET,
39 },
40 metrics::Metrics,
41};
42
43pub struct BasicQueue<B: BlockT> {
46 handle: BasicQueueHandle<B>,
48 result_port: BufferedLinkReceiver<B>,
50}
51
52impl<B: BlockT> Drop for BasicQueue<B> {
53 fn drop(&mut self) {
54 self.handle.close();
56 self.result_port.close();
57 }
58}
59
60impl<B: BlockT> BasicQueue<B> {
61 pub fn new<V>(
65 verifier: V,
66 block_import: BoxBlockImport<B>,
67 justification_import: Option<BoxJustificationImport<B>>,
68 spawner: &impl sp_core::traits::SpawnEssentialNamed,
69 prometheus_registry: Option<&Registry>,
70 ) -> Self
71 where
72 V: Verifier<B> + 'static,
73 {
74 let (result_sender, result_port) = buffered_link::buffered_link(100_000);
75
76 let metrics = prometheus_registry.and_then(|r| {
77 Metrics::register(r)
78 .map_err(|err| {
79 log::warn!("Failed to register Prometheus metrics: {}", err);
80 })
81 .ok()
82 });
83
84 let (future, justification_sender, block_import_sender) = BlockImportWorker::new(
85 result_sender,
86 verifier,
87 block_import,
88 justification_import,
89 metrics,
90 );
91
92 spawner.spawn_essential_blocking(
93 "basic-block-import-worker",
94 Some("block-import"),
95 future.boxed(),
96 );
97
98 Self {
99 handle: BasicQueueHandle::new(justification_sender, block_import_sender),
100 result_port,
101 }
102 }
103}
104
105#[derive(Clone)]
106struct BasicQueueHandle<B: BlockT> {
107 justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
109 block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
111}
112
113impl<B: BlockT> BasicQueueHandle<B> {
114 pub fn new(
115 justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
116 block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
117 ) -> Self {
118 Self { justification_sender, block_import_sender }
119 }
120
121 pub fn close(&mut self) {
122 self.justification_sender.close();
123 self.block_import_sender.close();
124 }
125}
126
127impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
128 fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
129 if blocks.is_empty() {
130 return;
131 }
132
133 trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len());
134 let res = self
135 .block_import_sender
136 .unbounded_send(worker_messages::ImportBlocks(origin, blocks));
137
138 if res.is_err() {
139 log::error!(
140 target: LOG_TARGET,
141 "import_blocks: Background import task is no longer alive"
142 );
143 }
144 }
145
146 fn import_justifications(
147 &mut self,
148 who: RuntimeOrigin,
149 hash: B::Hash,
150 number: NumberFor<B>,
151 justifications: Justifications,
152 ) {
153 for justification in justifications {
154 let res = self.justification_sender.unbounded_send(
155 worker_messages::ImportJustification(who, hash, number, justification),
156 );
157
158 if res.is_err() {
159 log::error!(
160 target: LOG_TARGET,
161 "import_justification: Background import task is no longer alive"
162 );
163 }
164 }
165 }
166}
167
168#[async_trait::async_trait]
169impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
170 fn service(&self) -> Box<dyn ImportQueueService<B>> {
172 Box::new(self.handle.clone())
173 }
174
175 fn service_ref(&mut self) -> &mut dyn ImportQueueService<B> {
177 &mut self.handle
178 }
179
180 fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) {
182 if self.result_port.poll_actions(cx, link).is_err() {
183 log::error!(
184 target: LOG_TARGET,
185 "poll_actions: Background import task is no longer alive"
186 );
187 }
188 }
189
190 async fn run(mut self, link: &dyn Link<B>) {
195 loop {
196 if let Err(_) = self.result_port.next_action(link).await {
197 log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
198 return;
199 }
200 }
201 }
202}
203
204mod worker_messages {
206 use super::*;
207
208 pub struct ImportBlocks<B: BlockT>(pub BlockOrigin, pub Vec<IncomingBlock<B>>);
209 pub struct ImportJustification<B: BlockT>(
210 pub RuntimeOrigin,
211 pub B::Hash,
212 pub NumberFor<B>,
213 pub Justification,
214 );
215}
216
217async fn block_import_process<B: BlockT>(
225 mut block_import: BoxBlockImport<B>,
226 verifier: impl Verifier<B>,
227 result_sender: BufferedLinkSender<B>,
228 mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
229 metrics: Option<Metrics>,
230) {
231 loop {
232 let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await
233 {
234 Some(blocks) => blocks,
235 None => {
236 log::debug!(
237 target: LOG_TARGET,
238 "Stopping block import because the import channel was closed!",
239 );
240 return;
241 },
242 };
243
244 let res =
245 import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await;
246
247 result_sender.blocks_processed(res.imported, res.block_count, res.results);
248 }
249}
250
251struct BlockImportWorker<B: BlockT> {
252 result_sender: BufferedLinkSender<B>,
253 justification_import: Option<BoxJustificationImport<B>>,
254 metrics: Option<Metrics>,
255}
256
257impl<B: BlockT> BlockImportWorker<B> {
258 fn new<V>(
259 result_sender: BufferedLinkSender<B>,
260 verifier: V,
261 block_import: BoxBlockImport<B>,
262 justification_import: Option<BoxJustificationImport<B>>,
263 metrics: Option<Metrics>,
264 ) -> (
265 impl Future<Output = ()> + Send,
266 TracingUnboundedSender<worker_messages::ImportJustification<B>>,
267 TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
268 )
269 where
270 V: Verifier<B> + 'static,
271 {
272 use worker_messages::*;
273
274 let (justification_sender, mut justification_port) =
275 tracing_unbounded("mpsc_import_queue_worker_justification", 100_000);
276
277 let (block_import_sender, block_import_receiver) =
278 tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000);
279
280 let mut worker = BlockImportWorker { result_sender, justification_import, metrics };
281
282 let future = async move {
283 if let Some(justification_import) = worker.justification_import.as_mut() {
285 for (hash, number) in justification_import.on_start().await {
286 worker.result_sender.request_justification(&hash, number);
287 }
288 }
289
290 let block_import_process = block_import_process(
291 block_import,
292 verifier,
293 worker.result_sender.clone(),
294 block_import_receiver,
295 worker.metrics.clone(),
296 );
297 futures::pin_mut!(block_import_process);
298
299 loop {
300 if worker.result_sender.is_closed() {
303 log::debug!(
304 target: LOG_TARGET,
305 "Stopping block import because result channel was closed!",
306 );
307 return;
308 }
309
310 while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
312 match justification {
313 Some(ImportJustification(who, hash, number, justification)) => {
314 worker.import_justification(who, hash, number, justification).await
315 },
316 None => {
317 log::debug!(
318 target: LOG_TARGET,
319 "Stopping block import because justification channel was closed!",
320 );
321 return;
322 },
323 }
324 }
325
326 if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
327 return;
328 }
329
330 futures::pending!()
332 }
333 };
334
335 (future, justification_sender, block_import_sender)
336 }
337
338 async fn import_justification(
339 &mut self,
340 who: RuntimeOrigin,
341 hash: B::Hash,
342 number: NumberFor<B>,
343 justification: Justification,
344 ) {
345 let started = std::time::Instant::now();
346
347 let import_result = match self.justification_import.as_mut() {
348 Some(justification_import) => {
349 let result = justification_import
350 .import_justification(hash, number, justification)
351 .await
352 .map_err(|e| {
353 debug!(
354 target: LOG_TARGET,
355 "Justification import failed for hash = {:?} with number = {:?} coming from node = {:?} with error: {}",
356 hash,
357 number,
358 who,
359 e,
360 );
361 e
362 });
363 match result {
364 Ok(()) => JustificationImportResult::Success,
365 Err(sp_consensus::Error::OutdatedJustification) => {
366 JustificationImportResult::OutdatedJustification
367 },
368 Err(_) => JustificationImportResult::Failure,
369 }
370 },
371 None => JustificationImportResult::Failure,
372 };
373
374 if let Some(metrics) = self.metrics.as_ref() {
375 metrics.justification_import_time.observe(started.elapsed().as_secs_f64());
376 }
377
378 self.result_sender.justification_imported(who, &hash, number, import_result);
379 }
380}
381
382struct ImportManyBlocksResult<B: BlockT> {
384 imported: usize,
386 block_count: usize,
388 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
390}
391
392async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
397 import_handle: &mut BoxBlockImport<B>,
398 blocks_origin: BlockOrigin,
399 blocks: Vec<IncomingBlock<B>>,
400 verifier: &V,
401 metrics: Option<Metrics>,
402) -> ImportManyBlocksResult<B> {
403 let count = blocks.len();
404
405 let blocks_range = match (
406 blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
407 blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
408 ) {
409 (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
410 (Some(first), Some(_)) => format!(" ({})", first),
411 _ => Default::default(),
412 };
413
414 debug!(target: LOG_TARGET, "Starting import of {count} blocks {blocks_range} (origin: {blocks_origin:?})");
415
416 let mut imported = 0;
417 let mut results = vec![];
418 let mut has_error = false;
419 let mut blocks = blocks.into_iter();
420
421 loop {
423 let block = match blocks.next() {
425 Some(b) => b,
426 None => {
427 debug!(target: LOG_TARGET, "Imported {imported} out of {count} blocks (origin: {blocks_origin:?})");
429 return ImportManyBlocksResult { block_count: count, imported, results };
430 },
431 };
432
433 let block_number = block.header.as_ref().map(|h| *h.number());
434 let block_hash = block.hash;
435 let import_result = if has_error {
436 Err(BlockImportError::Cancelled)
437 } else {
438 let verification_fut = verify_single_block_metered(
439 import_handle,
440 blocks_origin,
441 block,
442 verifier,
443 metrics.as_ref(),
444 );
445 match verification_fut.await {
446 Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status),
447 Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => {
448 import_single_block_metered(import_handle, import_parameters, metrics.as_ref())
450 .await
451 },
452 Err(e) => Err(e),
453 }
454 };
455
456 if let Some(metrics) = metrics.as_ref() {
457 metrics.report_import::<B>(&import_result);
458 }
459
460 if import_result.is_ok() {
461 trace!(
462 target: LOG_TARGET,
463 "Block imported successfully {:?} ({})",
464 block_number,
465 block_hash,
466 );
467 imported += 1;
468 } else {
469 has_error = true;
470 }
471
472 results.push((import_result, block_hash));
473
474 Yield::new().await
475 }
476}
477
478struct Yield(bool);
484
485impl Yield {
486 fn new() -> Self {
487 Self(false)
488 }
489}
490
491impl Future for Yield {
492 type Output = ();
493
494 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
495 if !self.0 {
496 self.0 = true;
497 cx.waker().wake_by_ref();
498 Poll::Pending
499 } else {
500 Poll::Ready(())
501 }
502 }
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508 use crate::{
509 block_import::{
510 BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
511 },
512 import_queue::Verifier,
513 };
514 use futures::{executor::block_on, Future};
515 use parking_lot::Mutex;
516 use sp_test_primitives::{Block, BlockNumber, Hash, Header};
517
518 #[async_trait::async_trait]
519 impl Verifier<Block> for () {
520 async fn verify(
521 &self,
522 block: BlockImportParams<Block>,
523 ) -> Result<BlockImportParams<Block>, String> {
524 Ok(BlockImportParams::new(block.origin, block.header))
525 }
526 }
527
528 #[async_trait::async_trait]
529 impl BlockImport<Block> for () {
530 type Error = sp_consensus::Error;
531
532 async fn check_block(
533 &self,
534 _block: BlockCheckParams<Block>,
535 ) -> Result<ImportResult, Self::Error> {
536 Ok(ImportResult::imported(false))
537 }
538
539 async fn import_block(
540 &self,
541 _block: BlockImportParams<Block>,
542 ) -> Result<ImportResult, Self::Error> {
543 Ok(ImportResult::imported(true))
544 }
545 }
546
547 #[async_trait::async_trait]
548 impl JustificationImport<Block> for () {
549 type Error = sp_consensus::Error;
550
551 async fn on_start(&mut self) -> Vec<(Hash, BlockNumber)> {
552 Vec::new()
553 }
554
555 async fn import_justification(
556 &mut self,
557 _hash: Hash,
558 _number: BlockNumber,
559 _justification: Justification,
560 ) -> Result<(), Self::Error> {
561 Ok(())
562 }
563 }
564
565 #[derive(Debug, PartialEq)]
566 enum Event {
567 JustificationImported(Hash),
568 BlockImported(Hash),
569 }
570
571 #[derive(Default)]
572 struct TestLink {
573 events: Mutex<Vec<Event>>,
574 }
575
576 impl Link<Block> for TestLink {
577 fn blocks_processed(
578 &self,
579 _imported: usize,
580 _count: usize,
581 results: Vec<(Result<BlockImportStatus<BlockNumber>, BlockImportError>, Hash)>,
582 ) {
583 if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) {
584 self.events.lock().push(Event::BlockImported(hash));
585 }
586 }
587
588 fn justification_imported(
589 &self,
590 _who: RuntimeOrigin,
591 hash: &Hash,
592 _number: BlockNumber,
593 _import_result: JustificationImportResult,
594 ) {
595 self.events.lock().push(Event::JustificationImported(*hash))
596 }
597 }
598
599 #[test]
600 fn prioritizes_finality_work_over_block_import() {
601 let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);
602
603 let (worker, finality_sender, block_import_sender) =
604 BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
605 futures::pin_mut!(worker);
606
607 let import_block = |n| {
608 let header = Header {
609 parent_hash: Hash::random(),
610 number: n,
611 extrinsics_root: Hash::random(),
612 state_root: Default::default(),
613 digest: Default::default(),
614 };
615
616 let hash = header.hash();
617
618 block_import_sender
619 .unbounded_send(worker_messages::ImportBlocks(
620 BlockOrigin::Own,
621 vec![IncomingBlock {
622 hash,
623 header: Some(header),
624 body: None,
625 indexed_body: None,
626 justifications: None,
627 origin: None,
628 allow_missing_state: false,
629 import_existing: false,
630 state: None,
631 skip_execution: false,
632 }],
633 ))
634 .unwrap();
635
636 hash
637 };
638
639 let import_justification = || {
640 let hash = Hash::random();
641 finality_sender
642 .unbounded_send(worker_messages::ImportJustification(
643 sc_network_types::PeerId::random(),
644 hash,
645 1,
646 (*b"TEST", Vec::new()),
647 ))
648 .unwrap();
649
650 hash
651 };
652
653 let link = TestLink::default();
654
655 let block1 = import_block(1);
657 let block2 = import_block(2);
658 let block3 = import_block(3);
659 let justification1 = import_justification();
660 let justification2 = import_justification();
661 let block4 = import_block(4);
662 let block5 = import_block(5);
663 let block6 = import_block(6);
664 let justification3 = import_justification();
665
666 block_on(futures::future::poll_fn(|cx| {
668 while link.events.lock().len() < 9 {
669 match Future::poll(Pin::new(&mut worker), cx) {
670 Poll::Pending => {},
671 Poll::Ready(()) => panic!("import queue worker should not conclude."),
672 }
673
674 result_port.poll_actions(cx, &link).unwrap();
675 }
676
677 Poll::Ready(())
678 }));
679
680 assert_eq!(
682 &*link.events.lock(),
683 &[
684 Event::JustificationImported(justification1),
685 Event::JustificationImported(justification2),
686 Event::JustificationImported(justification3),
687 Event::BlockImported(block1),
688 Event::BlockImported(block2),
689 Event::BlockImported(block3),
690 Event::BlockImported(block4),
691 Event::BlockImported(block5),
692 Event::BlockImported(block6),
693 ]
694 );
695 }
696}