pub trait Link<B: BlockT>: Send {
    fn blocks_processed(
        &mut self,
        _imported: usize,
        _count: usize,
        _results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>
    ) { ... } fn justification_imported(
        &mut self,
        _who: RuntimeOrigin,
        _hash: &B::Hash,
        _number: NumberFor<B>,
        _success: bool
    ) { ... } fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor<B>) { ... } }
Expand description

Hooks that the verification queue can use to influence the synchronization algorithm.

Provided Methods§

Batch of blocks imported, with or without error.

Examples found in repository?
src/import_queue/buffered_link.rs (line 129)
126
127
128
129
130
131
132
133
134
135
	pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &mut dyn Link<B>) {
		match msg {
			BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
				link.blocks_processed(imported, count, results),
			BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
				link.justification_imported(who, &hash, number, success),
			BlockImportWorkerMsg::RequestJustification(hash, number) =>
				link.request_justification(&hash, number),
		}
	}
More examples
Hide additional examples
src/import_queue/basic_queue.rs (line 254)
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
async fn block_import_process<B: BlockT, Transaction: Send + 'static>(
	mut block_import: BoxBlockImport<B, Transaction>,
	mut verifier: impl Verifier<B>,
	mut result_sender: BufferedLinkSender<B>,
	mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
	metrics: Option<Metrics>,
	delay_between_blocks: Duration,
) {
	loop {
		let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await
		{
			Some(blocks) => blocks,
			None => {
				log::debug!(
					target: LOG_TARGET,
					"Stopping block import because the import channel was closed!",
				);
				return
			},
		};

		let res = import_many_blocks(
			&mut block_import,
			origin,
			blocks,
			&mut verifier,
			delay_between_blocks,
			metrics.clone(),
		)
		.await;

		result_sender.blocks_processed(res.imported, res.block_count, res.results);
	}
}

Justification import result.

Examples found in repository?
src/import_queue/buffered_link.rs (line 131)
126
127
128
129
130
131
132
133
134
135
	pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &mut dyn Link<B>) {
		match msg {
			BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
				link.blocks_processed(imported, count, results),
			BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
				link.justification_imported(who, &hash, number, success),
			BlockImportWorkerMsg::RequestJustification(hash, number) =>
				link.request_justification(&hash, number),
		}
	}
More examples
Hide additional examples
src/import_queue/basic_queue.rs (line 376)
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
	async fn import_justification(
		&mut self,
		who: RuntimeOrigin,
		hash: B::Hash,
		number: NumberFor<B>,
		justification: Justification,
	) {
		let started = std::time::Instant::now();

		let success = match self.justification_import.as_mut() {
			Some(justification_import) => justification_import
				.import_justification(hash, number, justification)
				.await
				.map_err(|e| {
					debug!(
						target: LOG_TARGET,
						"Justification import failed for hash = {:?} with number = {:?} coming from node = {:?} with error: {}",
						hash,
						number,
						who,
						e,
					);
					e
				})
				.is_ok(),
			None => false,
		};

		if let Some(metrics) = self.metrics.as_ref() {
			metrics.justification_import_time.observe(started.elapsed().as_secs_f64());
		}

		self.result_sender.justification_imported(who, &hash, number, success);
	}

Request a justification for the given block.

Examples found in repository?
src/import_queue/buffered_link.rs (line 133)
126
127
128
129
130
131
132
133
134
135
	pub fn send_actions(&mut self, msg: BlockImportWorkerMsg<B>, link: &mut dyn Link<B>) {
		match msg {
			BlockImportWorkerMsg::BlocksProcessed(imported, count, results) =>
				link.blocks_processed(imported, count, results),
			BlockImportWorkerMsg::JustificationImported(who, hash, number, success) =>
				link.justification_imported(who, &hash, number, success),
			BlockImportWorkerMsg::RequestJustification(hash, number) =>
				link.request_justification(&hash, number),
		}
	}
More examples
Hide additional examples
src/import_queue/basic_queue.rs (line 292)
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
	fn new<V: 'static + Verifier<B>, Transaction: Send + 'static>(
		result_sender: BufferedLinkSender<B>,
		verifier: V,
		block_import: BoxBlockImport<B, Transaction>,
		justification_import: Option<BoxJustificationImport<B>>,
		metrics: Option<Metrics>,
	) -> (
		impl Future<Output = ()> + Send,
		TracingUnboundedSender<worker_messages::ImportJustification<B>>,
		TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
	) {
		use worker_messages::*;

		let (justification_sender, mut justification_port) =
			tracing_unbounded("mpsc_import_queue_worker_justification");

		let (block_import_sender, block_import_port) =
			tracing_unbounded("mpsc_import_queue_worker_blocks");

		let mut worker = BlockImportWorker { result_sender, justification_import, metrics };

		let delay_between_blocks = Duration::default();

		let future = async move {
			// Let's initialize `justification_import`
			if let Some(justification_import) = worker.justification_import.as_mut() {
				for (hash, number) in justification_import.on_start().await {
					worker.result_sender.request_justification(&hash, number);
				}
			}

			let block_import_process = block_import_process(
				block_import,
				verifier,
				worker.result_sender.clone(),
				block_import_port,
				worker.metrics.clone(),
				delay_between_blocks,
			);
			futures::pin_mut!(block_import_process);

			loop {
				// If the results sender is closed, that means that the import queue is shutting
				// down and we should end this future.
				if worker.result_sender.is_closed() {
					log::debug!(
						target: LOG_TARGET,
						"Stopping block import because result channel was closed!",
					);
					return
				}

				// Make sure to first process all justifications
				while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
					match justification {
						Some(ImportJustification(who, hash, number, justification)) =>
							worker.import_justification(who, hash, number, justification).await,
						None => {
							log::debug!(
								target: LOG_TARGET,
								"Stopping block import because justification channel was closed!",
							);
							return
						},
					}
				}

				if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
					return
				}

				// All futures that we polled are now pending.
				futures::pending!()
			}
		};

		(future, justification_sender, block_import_sender)
	}

Implementors§