1use crate::consensus::BlockOrigin;
8use crate::utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
9use futures::{
10 prelude::*,
11 task::{Context, Poll},
12};
13use log::{debug, trace};
14use soil_prometheus::Registry;
15use std::pin::Pin;
16use subsoil::runtime::{
17 traits::{Block as BlockT, Header as HeaderT, NumberFor},
18 Justification, Justifications,
19};
20
21use crate::import::{
22 metrics::Metrics,
23 queue::{
24 buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender},
25 import_single_block_metered, verify_single_block_metered, BlockImportError,
26 BlockImportStatus, BoxBlockImport, BoxJustificationImport, ImportQueue, ImportQueueService,
27 IncomingBlock, JustificationImportResult, Link, RuntimeOrigin,
28 SingleBlockVerificationOutcome, Verifier, LOG_TARGET,
29 },
30};
31
32pub struct BasicQueue<B: BlockT> {
35 handle: BasicQueueHandle<B>,
37 result_port: BufferedLinkReceiver<B>,
39}
40
41impl<B: BlockT> Drop for BasicQueue<B> {
42 fn drop(&mut self) {
43 self.handle.close();
45 self.result_port.close();
46 }
47}
48
49impl<B: BlockT> BasicQueue<B> {
50 pub fn new<V>(
54 verifier: V,
55 block_import: BoxBlockImport<B>,
56 justification_import: Option<BoxJustificationImport<B>>,
57 spawner: &impl subsoil::core::traits::SpawnEssentialNamed,
58 prometheus_registry: Option<&Registry>,
59 ) -> Self
60 where
61 V: Verifier<B> + 'static,
62 {
63 let (result_sender, result_port) = buffered_link::buffered_link(100_000);
64
65 let metrics = prometheus_registry.and_then(|r| {
66 Metrics::register(r)
67 .map_err(|err| {
68 log::warn!("Failed to register Prometheus metrics: {}", err);
69 })
70 .ok()
71 });
72
73 let (future, justification_sender, block_import_sender) = BlockImportWorker::new(
74 result_sender,
75 verifier,
76 block_import,
77 justification_import,
78 metrics,
79 );
80
81 spawner.spawn_essential_blocking(
82 "basic-block-import-worker",
83 Some("block-import"),
84 future.boxed(),
85 );
86
87 Self {
88 handle: BasicQueueHandle::new(justification_sender, block_import_sender),
89 result_port,
90 }
91 }
92}
93
94#[derive(Clone)]
95struct BasicQueueHandle<B: BlockT> {
96 justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
98 block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
100}
101
102impl<B: BlockT> BasicQueueHandle<B> {
103 pub fn new(
104 justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
105 block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
106 ) -> Self {
107 Self { justification_sender, block_import_sender }
108 }
109
110 pub fn close(&mut self) {
111 self.justification_sender.close();
112 self.block_import_sender.close();
113 }
114}
115
116impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
117 fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
118 if blocks.is_empty() {
119 return;
120 }
121
122 trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len());
123 let res = self
124 .block_import_sender
125 .unbounded_send(worker_messages::ImportBlocks(origin, blocks));
126
127 if res.is_err() {
128 log::error!(
129 target: LOG_TARGET,
130 "import_blocks: Background import task is no longer alive"
131 );
132 }
133 }
134
135 fn import_justifications(
136 &mut self,
137 who: RuntimeOrigin,
138 hash: B::Hash,
139 number: NumberFor<B>,
140 justifications: Justifications,
141 ) {
142 for justification in justifications {
143 let res = self.justification_sender.unbounded_send(
144 worker_messages::ImportJustification(who.clone(), hash, number, justification),
145 );
146
147 if res.is_err() {
148 log::error!(
149 target: LOG_TARGET,
150 "import_justification: Background import task is no longer alive"
151 );
152 }
153 }
154 }
155}
156
157#[async_trait::async_trait]
158impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
159 fn service(&self) -> Box<dyn ImportQueueService<B>> {
161 Box::new(self.handle.clone())
162 }
163
164 fn service_ref(&mut self) -> &mut dyn ImportQueueService<B> {
166 &mut self.handle
167 }
168
169 fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) {
171 if self.result_port.poll_actions(cx, link).is_err() {
172 log::error!(
173 target: LOG_TARGET,
174 "poll_actions: Background import task is no longer alive"
175 );
176 }
177 }
178
179 async fn run(mut self, link: &dyn Link<B>) {
184 loop {
185 if let Err(_) = self.result_port.next_action(link).await {
186 log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
187 return;
188 }
189 }
190 }
191}
192
193mod worker_messages {
195 use super::*;
196
197 pub struct ImportBlocks<B: BlockT>(pub BlockOrigin, pub Vec<IncomingBlock<B>>);
198 pub struct ImportJustification<B: BlockT>(
199 pub RuntimeOrigin,
200 pub B::Hash,
201 pub NumberFor<B>,
202 pub Justification,
203 );
204}
205
206async fn block_import_process<B: BlockT>(
214 mut block_import: BoxBlockImport<B>,
215 verifier: impl Verifier<B>,
216 result_sender: BufferedLinkSender<B>,
217 mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
218 metrics: Option<Metrics>,
219) {
220 loop {
221 let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await
222 {
223 Some(blocks) => blocks,
224 None => {
225 log::debug!(
226 target: LOG_TARGET,
227 "Stopping block import because the import channel was closed!",
228 );
229 return;
230 },
231 };
232
233 let res =
234 import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await;
235
236 result_sender.blocks_processed(res.imported, res.block_count, res.results);
237 }
238}
239
240struct BlockImportWorker<B: BlockT> {
241 result_sender: BufferedLinkSender<B>,
242 justification_import: Option<BoxJustificationImport<B>>,
243 metrics: Option<Metrics>,
244}
245
246impl<B: BlockT> BlockImportWorker<B> {
247 fn new<V>(
248 result_sender: BufferedLinkSender<B>,
249 verifier: V,
250 block_import: BoxBlockImport<B>,
251 justification_import: Option<BoxJustificationImport<B>>,
252 metrics: Option<Metrics>,
253 ) -> (
254 impl Future<Output = ()> + Send,
255 TracingUnboundedSender<worker_messages::ImportJustification<B>>,
256 TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
257 )
258 where
259 V: Verifier<B> + 'static,
260 {
261 use worker_messages::*;
262
263 let (justification_sender, mut justification_port) =
264 tracing_unbounded("mpsc_import_queue_worker_justification", 100_000);
265
266 let (block_import_sender, block_import_receiver) =
267 tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000);
268
269 let mut worker = BlockImportWorker { result_sender, justification_import, metrics };
270
271 let future = async move {
272 if let Some(justification_import) = worker.justification_import.as_mut() {
274 for (hash, number) in justification_import.on_start().await {
275 worker.result_sender.request_justification(&hash, number);
276 }
277 }
278
279 let block_import_process = block_import_process(
280 block_import,
281 verifier,
282 worker.result_sender.clone(),
283 block_import_receiver,
284 worker.metrics.clone(),
285 );
286 futures::pin_mut!(block_import_process);
287
288 loop {
289 if worker.result_sender.is_closed() {
292 log::debug!(
293 target: LOG_TARGET,
294 "Stopping block import because result channel was closed!",
295 );
296 return;
297 }
298
299 while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
301 match justification {
302 Some(ImportJustification(who, hash, number, justification)) => {
303 worker.import_justification(who, hash, number, justification).await
304 },
305 None => {
306 log::debug!(
307 target: LOG_TARGET,
308 "Stopping block import because justification channel was closed!",
309 );
310 return;
311 },
312 }
313 }
314
315 if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
316 return;
317 }
318
319 futures::pending!()
321 }
322 };
323
324 (future, justification_sender, block_import_sender)
325 }
326
327 async fn import_justification(
328 &mut self,
329 who: RuntimeOrigin,
330 hash: B::Hash,
331 number: NumberFor<B>,
332 justification: Justification,
333 ) {
334 let started = std::time::Instant::now();
335
336 let import_result = match self.justification_import.as_mut() {
337 Some(justification_import) => {
338 let result = justification_import
339 .import_justification(hash, number, justification)
340 .await
341 .map_err(|e| {
342 debug!(
343 target: LOG_TARGET,
344 "Justification import failed for hash = {:?} with number = {:?} coming from node = {:?} with error: {}",
345 hash,
346 number,
347 who,
348 e,
349 );
350 e
351 });
352 match result {
353 Ok(()) => JustificationImportResult::Success,
354 Err(crate::consensus::Error::OutdatedJustification) => {
355 JustificationImportResult::OutdatedJustification
356 },
357 Err(_) => JustificationImportResult::Failure,
358 }
359 },
360 None => JustificationImportResult::Failure,
361 };
362
363 if let Some(metrics) = self.metrics.as_ref() {
364 metrics.justification_import_time.observe(started.elapsed().as_secs_f64());
365 }
366
367 self.result_sender.justification_imported(who, &hash, number, import_result);
368 }
369}
370
371struct ImportManyBlocksResult<B: BlockT> {
373 imported: usize,
375 block_count: usize,
377 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
379}
380
381async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
386 import_handle: &mut BoxBlockImport<B>,
387 blocks_origin: BlockOrigin,
388 blocks: Vec<IncomingBlock<B>>,
389 verifier: &V,
390 metrics: Option<Metrics>,
391) -> ImportManyBlocksResult<B> {
392 let count = blocks.len();
393
394 let blocks_range = match (
395 blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
396 blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
397 ) {
398 (Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
399 (Some(first), Some(_)) => format!(" ({})", first),
400 _ => Default::default(),
401 };
402
403 debug!(target: LOG_TARGET, "Starting import of {count} blocks {blocks_range} (origin: {blocks_origin:?})");
404
405 let mut imported = 0;
406 let mut results = vec![];
407 let mut has_error = false;
408 let mut blocks = blocks.into_iter();
409
410 loop {
412 let block = match blocks.next() {
414 Some(b) => b,
415 None => {
416 debug!(target: LOG_TARGET, "Imported {imported} out of {count} blocks (origin: {blocks_origin:?})");
418 return ImportManyBlocksResult { block_count: count, imported, results };
419 },
420 };
421
422 let block_number = block.header.as_ref().map(|h| *h.number());
423 let block_hash = block.hash;
424 let import_result = if has_error {
425 Err(BlockImportError::Cancelled)
426 } else {
427 let verification_fut = verify_single_block_metered(
428 import_handle,
429 blocks_origin,
430 block,
431 verifier,
432 metrics.as_ref(),
433 );
434 match verification_fut.await {
435 Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status),
436 Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => {
437 import_single_block_metered(import_handle, import_parameters, metrics.as_ref())
439 .await
440 },
441 Err(e) => Err(e),
442 }
443 };
444
445 if let Some(metrics) = metrics.as_ref() {
446 metrics.report_import::<B>(&import_result);
447 }
448
449 if import_result.is_ok() {
450 trace!(
451 target: LOG_TARGET,
452 "Block imported successfully {:?} ({})",
453 block_number,
454 block_hash,
455 );
456 imported += 1;
457 } else {
458 has_error = true;
459 }
460
461 results.push((import_result, block_hash));
462
463 Yield::new().await
464 }
465}
466
467struct Yield(bool);
473
474impl Yield {
475 fn new() -> Self {
476 Self(false)
477 }
478}
479
480impl Future for Yield {
481 type Output = ();
482
483 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
484 if !self.0 {
485 self.0 = true;
486 cx.waker().wake_by_ref();
487 Poll::Pending
488 } else {
489 Poll::Ready(())
490 }
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497 use crate::{
498 import::block_import::{
499 BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
500 },
501 import::queue::Verifier,
502 };
503 use futures::{executor::block_on, Future};
504 use parking_lot::Mutex;
505 use soil_test::primitives::{Block, BlockNumber, Hash, Header};
506
507 #[async_trait::async_trait]
508 impl Verifier<Block> for () {
509 async fn verify(
510 &self,
511 block: BlockImportParams<Block>,
512 ) -> Result<BlockImportParams<Block>, String> {
513 Ok(BlockImportParams::new(block.origin, block.header))
514 }
515 }
516
517 #[async_trait::async_trait]
518 impl BlockImport<Block> for () {
519 type Error = crate::consensus::Error;
520
521 async fn check_block(
522 &self,
523 _block: BlockCheckParams<Block>,
524 ) -> Result<ImportResult, Self::Error> {
525 Ok(ImportResult::imported(false))
526 }
527
528 async fn import_block(
529 &self,
530 _block: BlockImportParams<Block>,
531 ) -> Result<ImportResult, Self::Error> {
532 Ok(ImportResult::imported(true))
533 }
534 }
535
536 #[async_trait::async_trait]
537 impl JustificationImport<Block> for () {
538 type Error = crate::consensus::Error;
539
540 async fn on_start(&mut self) -> Vec<(Hash, BlockNumber)> {
541 Vec::new()
542 }
543
544 async fn import_justification(
545 &mut self,
546 _hash: Hash,
547 _number: BlockNumber,
548 _justification: Justification,
549 ) -> Result<(), Self::Error> {
550 Ok(())
551 }
552 }
553
554 #[derive(Debug, PartialEq)]
555 enum Event {
556 JustificationImported(Hash),
557 BlockImported(Hash),
558 }
559
560 #[derive(Default)]
561 struct TestLink {
562 events: Mutex<Vec<Event>>,
563 }
564
565 impl Link<Block> for TestLink {
566 fn blocks_processed(
567 &self,
568 _imported: usize,
569 _count: usize,
570 results: Vec<(Result<BlockImportStatus<BlockNumber>, BlockImportError>, Hash)>,
571 ) {
572 if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) {
573 self.events.lock().push(Event::BlockImported(hash));
574 }
575 }
576
577 fn justification_imported(
578 &self,
579 _who: RuntimeOrigin,
580 hash: &Hash,
581 _number: BlockNumber,
582 _import_result: JustificationImportResult,
583 ) {
584 self.events.lock().push(Event::JustificationImported(*hash))
585 }
586 }
587
588 #[test]
589 fn prioritizes_finality_work_over_block_import() {
590 let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);
591
592 let (worker, finality_sender, block_import_sender) =
593 BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
594 futures::pin_mut!(worker);
595
596 let import_block = |n| {
597 let header = Header {
598 parent_hash: Hash::random(),
599 number: n,
600 extrinsics_root: Hash::random(),
601 state_root: Default::default(),
602 digest: Default::default(),
603 };
604
605 let hash = header.hash();
606
607 block_import_sender
608 .unbounded_send(worker_messages::ImportBlocks(
609 BlockOrigin::Own,
610 vec![IncomingBlock {
611 hash,
612 header: Some(header),
613 body: None,
614 indexed_body: None,
615 justifications: None,
616 origin: None,
617 allow_missing_state: false,
618 import_existing: false,
619 state: None,
620 skip_execution: false,
621 }],
622 ))
623 .unwrap();
624
625 hash
626 };
627
628 let import_justification = || {
629 let hash = Hash::random();
630 finality_sender
631 .unbounded_send(worker_messages::ImportJustification(
632 RuntimeOrigin::from(vec![1, 2, 3]),
633 hash,
634 1,
635 (*b"TEST", Vec::new()),
636 ))
637 .unwrap();
638
639 hash
640 };
641
642 let link = TestLink::default();
643
644 let block1 = import_block(1);
646 let block2 = import_block(2);
647 let block3 = import_block(3);
648 let justification1 = import_justification();
649 let justification2 = import_justification();
650 let block4 = import_block(4);
651 let block5 = import_block(5);
652 let block6 = import_block(6);
653 let justification3 = import_justification();
654
655 block_on(futures::future::poll_fn(|cx| {
657 while link.events.lock().len() < 9 {
658 match Future::poll(Pin::new(&mut worker), cx) {
659 Poll::Pending => {},
660 Poll::Ready(()) => panic!("import queue worker should not conclude."),
661 }
662
663 result_port.poll_actions(cx, &link).unwrap();
664 }
665
666 Poll::Ready(())
667 }));
668
669 assert_eq!(
671 &*link.events.lock(),
672 &[
673 Event::JustificationImported(justification1),
674 Event::JustificationImported(justification2),
675 Event::JustificationImported(justification3),
676 Event::BlockImported(block1),
677 Event::BlockImported(block2),
678 Event::BlockImported(block3),
679 Event::BlockImported(block4),
680 Event::BlockImported(block5),
681 Event::BlockImported(block6),
682 ]
683 );
684 }
685}