Skip to main content

soil_client/import/queue/
basic_queue.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7use crate::consensus::BlockOrigin;
8use crate::utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
9use futures::{
10	prelude::*,
11	task::{Context, Poll},
12};
13use log::{debug, trace};
14use soil_prometheus::Registry;
15use std::pin::Pin;
16use subsoil::runtime::{
17	traits::{Block as BlockT, Header as HeaderT, NumberFor},
18	Justification, Justifications,
19};
20
21use crate::import::{
22	metrics::Metrics,
23	queue::{
24		buffered_link::{self, BufferedLinkReceiver, BufferedLinkSender},
25		import_single_block_metered, verify_single_block_metered, BlockImportError,
26		BlockImportStatus, BoxBlockImport, BoxJustificationImport, ImportQueue, ImportQueueService,
27		IncomingBlock, JustificationImportResult, Link, RuntimeOrigin,
28		SingleBlockVerificationOutcome, Verifier, LOG_TARGET,
29	},
30};
31
32/// Interface to a basic block import queue that is importing blocks sequentially in a separate
33/// task, with plugable verification.
34pub struct BasicQueue<B: BlockT> {
35	/// Handle for sending justification and block import messages to the background task.
36	handle: BasicQueueHandle<B>,
37	/// Results coming from the worker task.
38	result_port: BufferedLinkReceiver<B>,
39}
40
41impl<B: BlockT> Drop for BasicQueue<B> {
42	fn drop(&mut self) {
43		// Flush the queue and close the receiver to terminate the future.
44		self.handle.close();
45		self.result_port.close();
46	}
47}
48
49impl<B: BlockT> BasicQueue<B> {
50	/// Instantiate a new basic queue, with given verifier.
51	///
52	/// This creates a background task, and calls `on_start` on the justification importer.
53	pub fn new<V>(
54		verifier: V,
55		block_import: BoxBlockImport<B>,
56		justification_import: Option<BoxJustificationImport<B>>,
57		spawner: &impl subsoil::core::traits::SpawnEssentialNamed,
58		prometheus_registry: Option<&Registry>,
59	) -> Self
60	where
61		V: Verifier<B> + 'static,
62	{
63		let (result_sender, result_port) = buffered_link::buffered_link(100_000);
64
65		let metrics = prometheus_registry.and_then(|r| {
66			Metrics::register(r)
67				.map_err(|err| {
68					log::warn!("Failed to register Prometheus metrics: {}", err);
69				})
70				.ok()
71		});
72
73		let (future, justification_sender, block_import_sender) = BlockImportWorker::new(
74			result_sender,
75			verifier,
76			block_import,
77			justification_import,
78			metrics,
79		);
80
81		spawner.spawn_essential_blocking(
82			"basic-block-import-worker",
83			Some("block-import"),
84			future.boxed(),
85		);
86
87		Self {
88			handle: BasicQueueHandle::new(justification_sender, block_import_sender),
89			result_port,
90		}
91	}
92}
93
94#[derive(Clone)]
95struct BasicQueueHandle<B: BlockT> {
96	/// Channel to send justification import messages to the background task.
97	justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
98	/// Channel to send block import messages to the background task.
99	block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
100}
101
102impl<B: BlockT> BasicQueueHandle<B> {
103	pub fn new(
104		justification_sender: TracingUnboundedSender<worker_messages::ImportJustification<B>>,
105		block_import_sender: TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
106	) -> Self {
107		Self { justification_sender, block_import_sender }
108	}
109
110	pub fn close(&mut self) {
111		self.justification_sender.close();
112		self.block_import_sender.close();
113	}
114}
115
116impl<B: BlockT> ImportQueueService<B> for BasicQueueHandle<B> {
117	fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>) {
118		if blocks.is_empty() {
119			return;
120		}
121
122		trace!(target: LOG_TARGET, "Scheduling {} blocks for import", blocks.len());
123		let res = self
124			.block_import_sender
125			.unbounded_send(worker_messages::ImportBlocks(origin, blocks));
126
127		if res.is_err() {
128			log::error!(
129				target: LOG_TARGET,
130				"import_blocks: Background import task is no longer alive"
131			);
132		}
133	}
134
135	fn import_justifications(
136		&mut self,
137		who: RuntimeOrigin,
138		hash: B::Hash,
139		number: NumberFor<B>,
140		justifications: Justifications,
141	) {
142		for justification in justifications {
143			let res = self.justification_sender.unbounded_send(
144				worker_messages::ImportJustification(who.clone(), hash, number, justification),
145			);
146
147			if res.is_err() {
148				log::error!(
149					target: LOG_TARGET,
150					"import_justification: Background import task is no longer alive"
151				);
152			}
153		}
154	}
155}
156
157#[async_trait::async_trait]
158impl<B: BlockT> ImportQueue<B> for BasicQueue<B> {
159	/// Get handle to [`ImportQueueService`].
160	fn service(&self) -> Box<dyn ImportQueueService<B>> {
161		Box::new(self.handle.clone())
162	}
163
164	/// Get a reference to the handle to [`ImportQueueService`].
165	fn service_ref(&mut self) -> &mut dyn ImportQueueService<B> {
166		&mut self.handle
167	}
168
169	/// Poll actions from network.
170	fn poll_actions(&mut self, cx: &mut Context, link: &dyn Link<B>) {
171		if self.result_port.poll_actions(cx, link).is_err() {
172			log::error!(
173				target: LOG_TARGET,
174				"poll_actions: Background import task is no longer alive"
175			);
176		}
177	}
178
179	/// Start asynchronous runner for import queue.
180	///
181	/// Takes an object implementing [`Link`] which allows the import queue to
182	/// influence the synchronization process.
183	async fn run(mut self, link: &dyn Link<B>) {
184		loop {
185			if let Err(_) = self.result_port.next_action(link).await {
186				log::error!(target: "sync", "poll_actions: Background import task is no longer alive");
187				return;
188			}
189		}
190	}
191}
192
193/// Messages designated to the background worker.
194mod worker_messages {
195	use super::*;
196
197	pub struct ImportBlocks<B: BlockT>(pub BlockOrigin, pub Vec<IncomingBlock<B>>);
198	pub struct ImportJustification<B: BlockT>(
199		pub RuntimeOrigin,
200		pub B::Hash,
201		pub NumberFor<B>,
202		pub Justification,
203	);
204}
205
206/// The process of importing blocks.
207///
208/// This polls the `block_import_receiver` for new blocks to import and than awaits on
209/// importing these blocks. After each block is imported, this async function yields once
210/// to give other futures the possibility to be run.
211///
212/// Returns when `block_import` ended.
213async fn block_import_process<B: BlockT>(
214	mut block_import: BoxBlockImport<B>,
215	verifier: impl Verifier<B>,
216	result_sender: BufferedLinkSender<B>,
217	mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
218	metrics: Option<Metrics>,
219) {
220	loop {
221		let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await
222		{
223			Some(blocks) => blocks,
224			None => {
225				log::debug!(
226					target: LOG_TARGET,
227					"Stopping block import because the import channel was closed!",
228				);
229				return;
230			},
231		};
232
233		let res =
234			import_many_blocks(&mut block_import, origin, blocks, &verifier, metrics.clone()).await;
235
236		result_sender.blocks_processed(res.imported, res.block_count, res.results);
237	}
238}
239
240struct BlockImportWorker<B: BlockT> {
241	result_sender: BufferedLinkSender<B>,
242	justification_import: Option<BoxJustificationImport<B>>,
243	metrics: Option<Metrics>,
244}
245
246impl<B: BlockT> BlockImportWorker<B> {
247	fn new<V>(
248		result_sender: BufferedLinkSender<B>,
249		verifier: V,
250		block_import: BoxBlockImport<B>,
251		justification_import: Option<BoxJustificationImport<B>>,
252		metrics: Option<Metrics>,
253	) -> (
254		impl Future<Output = ()> + Send,
255		TracingUnboundedSender<worker_messages::ImportJustification<B>>,
256		TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
257	)
258	where
259		V: Verifier<B> + 'static,
260	{
261		use worker_messages::*;
262
263		let (justification_sender, mut justification_port) =
264			tracing_unbounded("mpsc_import_queue_worker_justification", 100_000);
265
266		let (block_import_sender, block_import_receiver) =
267			tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000);
268
269		let mut worker = BlockImportWorker { result_sender, justification_import, metrics };
270
271		let future = async move {
272			// Let's initialize `justification_import`
273			if let Some(justification_import) = worker.justification_import.as_mut() {
274				for (hash, number) in justification_import.on_start().await {
275					worker.result_sender.request_justification(&hash, number);
276				}
277			}
278
279			let block_import_process = block_import_process(
280				block_import,
281				verifier,
282				worker.result_sender.clone(),
283				block_import_receiver,
284				worker.metrics.clone(),
285			);
286			futures::pin_mut!(block_import_process);
287
288			loop {
289				// If the results sender is closed, that means that the import queue is shutting
290				// down and we should end this future.
291				if worker.result_sender.is_closed() {
292					log::debug!(
293						target: LOG_TARGET,
294						"Stopping block import because result channel was closed!",
295					);
296					return;
297				}
298
299				// Make sure to first process all justifications
300				while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
301					match justification {
302						Some(ImportJustification(who, hash, number, justification)) => {
303							worker.import_justification(who, hash, number, justification).await
304						},
305						None => {
306							log::debug!(
307								target: LOG_TARGET,
308								"Stopping block import because justification channel was closed!",
309							);
310							return;
311						},
312					}
313				}
314
315				if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
316					return;
317				}
318
319				// All futures that we polled are now pending.
320				futures::pending!()
321			}
322		};
323
324		(future, justification_sender, block_import_sender)
325	}
326
327	async fn import_justification(
328		&mut self,
329		who: RuntimeOrigin,
330		hash: B::Hash,
331		number: NumberFor<B>,
332		justification: Justification,
333	) {
334		let started = std::time::Instant::now();
335
336		let import_result = match self.justification_import.as_mut() {
337			Some(justification_import) => {
338				let result = justification_import
339				.import_justification(hash, number, justification)
340				.await
341				.map_err(|e| {
342					debug!(
343						target: LOG_TARGET,
344						"Justification import failed for hash = {:?} with number = {:?} coming from node = {:?} with error: {}",
345						hash,
346						number,
347						who,
348						e,
349					);
350					e
351				});
352				match result {
353					Ok(()) => JustificationImportResult::Success,
354					Err(crate::consensus::Error::OutdatedJustification) => {
355						JustificationImportResult::OutdatedJustification
356					},
357					Err(_) => JustificationImportResult::Failure,
358				}
359			},
360			None => JustificationImportResult::Failure,
361		};
362
363		if let Some(metrics) = self.metrics.as_ref() {
364			metrics.justification_import_time.observe(started.elapsed().as_secs_f64());
365		}
366
367		self.result_sender.justification_imported(who, &hash, number, import_result);
368	}
369}
370
371/// Result of [`import_many_blocks`].
372struct ImportManyBlocksResult<B: BlockT> {
373	/// The number of blocks imported successfully.
374	imported: usize,
375	/// The total number of blocks processed.
376	block_count: usize,
377	/// The import results for each block.
378	results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
379}
380
381/// Import several blocks at once, returning import result for each block.
382///
383/// This will yield after each imported block once, to ensure that other futures can
384/// be called as well.
385async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
386	import_handle: &mut BoxBlockImport<B>,
387	blocks_origin: BlockOrigin,
388	blocks: Vec<IncomingBlock<B>>,
389	verifier: &V,
390	metrics: Option<Metrics>,
391) -> ImportManyBlocksResult<B> {
392	let count = blocks.len();
393
394	let blocks_range = match (
395		blocks.first().and_then(|b| b.header.as_ref().map(|h| h.number())),
396		blocks.last().and_then(|b| b.header.as_ref().map(|h| h.number())),
397	) {
398		(Some(first), Some(last)) if first != last => format!(" ({}..{})", first, last),
399		(Some(first), Some(_)) => format!(" ({})", first),
400		_ => Default::default(),
401	};
402
403	debug!(target: LOG_TARGET, "Starting import of {count} blocks {blocks_range} (origin: {blocks_origin:?})");
404
405	let mut imported = 0;
406	let mut results = vec![];
407	let mut has_error = false;
408	let mut blocks = blocks.into_iter();
409
410	// Blocks in the response/drain should be in ascending order.
411	loop {
412		// Is there any block left to import?
413		let block = match blocks.next() {
414			Some(b) => b,
415			None => {
416				// No block left to import, success!
417				debug!(target: LOG_TARGET, "Imported {imported} out of {count} blocks (origin: {blocks_origin:?})");
418				return ImportManyBlocksResult { block_count: count, imported, results };
419			},
420		};
421
422		let block_number = block.header.as_ref().map(|h| *h.number());
423		let block_hash = block.hash;
424		let import_result = if has_error {
425			Err(BlockImportError::Cancelled)
426		} else {
427			let verification_fut = verify_single_block_metered(
428				import_handle,
429				blocks_origin,
430				block,
431				verifier,
432				metrics.as_ref(),
433			);
434			match verification_fut.await {
435				Ok(SingleBlockVerificationOutcome::Imported(import_status)) => Ok(import_status),
436				Ok(SingleBlockVerificationOutcome::Verified(import_parameters)) => {
437					// The actual import.
438					import_single_block_metered(import_handle, import_parameters, metrics.as_ref())
439						.await
440				},
441				Err(e) => Err(e),
442			}
443		};
444
445		if let Some(metrics) = metrics.as_ref() {
446			metrics.report_import::<B>(&import_result);
447		}
448
449		if import_result.is_ok() {
450			trace!(
451				target: LOG_TARGET,
452				"Block imported successfully {:?} ({})",
453				block_number,
454				block_hash,
455			);
456			imported += 1;
457		} else {
458			has_error = true;
459		}
460
461		results.push((import_result, block_hash));
462
463		Yield::new().await
464	}
465}
466
467/// A future that will always `yield` on the first call of `poll` but schedules the
468/// current task for re-execution.
469///
470/// This is done by getting the waker and calling `wake_by_ref` followed by returning
471/// `Pending`. The next time the `poll` is called, it will return `Ready`.
472struct Yield(bool);
473
474impl Yield {
475	fn new() -> Self {
476		Self(false)
477	}
478}
479
480impl Future for Yield {
481	type Output = ();
482
483	fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
484		if !self.0 {
485			self.0 = true;
486			cx.waker().wake_by_ref();
487			Poll::Pending
488		} else {
489			Poll::Ready(())
490		}
491	}
492}
493
494#[cfg(test)]
495mod tests {
496	use super::*;
497	use crate::{
498		import::block_import::{
499			BlockCheckParams, BlockImport, BlockImportParams, ImportResult, JustificationImport,
500		},
501		import::queue::Verifier,
502	};
503	use futures::{executor::block_on, Future};
504	use parking_lot::Mutex;
505	use soil_test::primitives::{Block, BlockNumber, Hash, Header};
506
507	#[async_trait::async_trait]
508	impl Verifier<Block> for () {
509		async fn verify(
510			&self,
511			block: BlockImportParams<Block>,
512		) -> Result<BlockImportParams<Block>, String> {
513			Ok(BlockImportParams::new(block.origin, block.header))
514		}
515	}
516
517	#[async_trait::async_trait]
518	impl BlockImport<Block> for () {
519		type Error = crate::consensus::Error;
520
521		async fn check_block(
522			&self,
523			_block: BlockCheckParams<Block>,
524		) -> Result<ImportResult, Self::Error> {
525			Ok(ImportResult::imported(false))
526		}
527
528		async fn import_block(
529			&self,
530			_block: BlockImportParams<Block>,
531		) -> Result<ImportResult, Self::Error> {
532			Ok(ImportResult::imported(true))
533		}
534	}
535
536	#[async_trait::async_trait]
537	impl JustificationImport<Block> for () {
538		type Error = crate::consensus::Error;
539
540		async fn on_start(&mut self) -> Vec<(Hash, BlockNumber)> {
541			Vec::new()
542		}
543
544		async fn import_justification(
545			&mut self,
546			_hash: Hash,
547			_number: BlockNumber,
548			_justification: Justification,
549		) -> Result<(), Self::Error> {
550			Ok(())
551		}
552	}
553
554	#[derive(Debug, PartialEq)]
555	enum Event {
556		JustificationImported(Hash),
557		BlockImported(Hash),
558	}
559
560	#[derive(Default)]
561	struct TestLink {
562		events: Mutex<Vec<Event>>,
563	}
564
565	impl Link<Block> for TestLink {
566		fn blocks_processed(
567			&self,
568			_imported: usize,
569			_count: usize,
570			results: Vec<(Result<BlockImportStatus<BlockNumber>, BlockImportError>, Hash)>,
571		) {
572			if let Some(hash) = results.into_iter().find_map(|(r, h)| r.ok().map(|_| h)) {
573				self.events.lock().push(Event::BlockImported(hash));
574			}
575		}
576
577		fn justification_imported(
578			&self,
579			_who: RuntimeOrigin,
580			hash: &Hash,
581			_number: BlockNumber,
582			_import_result: JustificationImportResult,
583		) {
584			self.events.lock().push(Event::JustificationImported(*hash))
585		}
586	}
587
588	#[test]
589	fn prioritizes_finality_work_over_block_import() {
590		let (result_sender, mut result_port) = buffered_link::buffered_link(100_000);
591
592		let (worker, finality_sender, block_import_sender) =
593			BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None);
594		futures::pin_mut!(worker);
595
596		let import_block = |n| {
597			let header = Header {
598				parent_hash: Hash::random(),
599				number: n,
600				extrinsics_root: Hash::random(),
601				state_root: Default::default(),
602				digest: Default::default(),
603			};
604
605			let hash = header.hash();
606
607			block_import_sender
608				.unbounded_send(worker_messages::ImportBlocks(
609					BlockOrigin::Own,
610					vec![IncomingBlock {
611						hash,
612						header: Some(header),
613						body: None,
614						indexed_body: None,
615						justifications: None,
616						origin: None,
617						allow_missing_state: false,
618						import_existing: false,
619						state: None,
620						skip_execution: false,
621					}],
622				))
623				.unwrap();
624
625			hash
626		};
627
628		let import_justification = || {
629			let hash = Hash::random();
630			finality_sender
631				.unbounded_send(worker_messages::ImportJustification(
632					RuntimeOrigin::from(vec![1, 2, 3]),
633					hash,
634					1,
635					(*b"TEST", Vec::new()),
636				))
637				.unwrap();
638
639			hash
640		};
641
642		let link = TestLink::default();
643
644		// we send a bunch of tasks to the worker
645		let block1 = import_block(1);
646		let block2 = import_block(2);
647		let block3 = import_block(3);
648		let justification1 = import_justification();
649		let justification2 = import_justification();
650		let block4 = import_block(4);
651		let block5 = import_block(5);
652		let block6 = import_block(6);
653		let justification3 = import_justification();
654
655		// we poll the worker until we have processed 9 events
656		block_on(futures::future::poll_fn(|cx| {
657			while link.events.lock().len() < 9 {
658				match Future::poll(Pin::new(&mut worker), cx) {
659					Poll::Pending => {},
660					Poll::Ready(()) => panic!("import queue worker should not conclude."),
661				}
662
663				result_port.poll_actions(cx, &link).unwrap();
664			}
665
666			Poll::Ready(())
667		}));
668
669		// all justification tasks must be done before any block import work
670		assert_eq!(
671			&*link.events.lock(),
672			&[
673				Event::JustificationImported(justification1),
674				Event::JustificationImported(justification2),
675				Event::JustificationImported(justification3),
676				Event::BlockImported(block1),
677				Event::BlockImported(block2),
678				Event::BlockImported(block3),
679				Event::BlockImported(block4),
680				Event::BlockImported(block5),
681				Event::BlockImported(block6),
682			]
683		);
684	}
685}