Trait sc_consensus::import_queue::Link
source · pub trait Link<B: BlockT>: Send {
fn blocks_processed(
&mut self,
_imported: usize,
_count: usize,
_results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>
) { ... }
fn justification_imported(
&mut self,
_who: RuntimeOrigin,
_hash: &B::Hash,
_number: NumberFor<B>,
_success: bool
) { ... }
fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor<B>) { ... }
}Expand description
Hooks that the verification queue can use to influence the synchronization algorithm.
Provided Methods§
sourcefn blocks_processed(
&mut self,
_imported: usize,
_count: usize,
_results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>
)
fn blocks_processed(
&mut self,
_imported: usize,
_count: usize,
_results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>
)
Batch of blocks imported, with or without error.
Examples found in repository?
More examples
src/import_queue/basic_queue.rs (line 254)
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256
async fn block_import_process<B: BlockT, Transaction: Send + 'static>(
mut block_import: BoxBlockImport<B, Transaction>,
mut verifier: impl Verifier<B>,
mut result_sender: BufferedLinkSender<B>,
mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
metrics: Option<Metrics>,
delay_between_blocks: Duration,
) {
loop {
let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await
{
Some(blocks) => blocks,
None => {
log::debug!(
target: LOG_TARGET,
"Stopping block import because the import channel was closed!",
);
return
},
};
let res = import_many_blocks(
&mut block_import,
origin,
blocks,
&mut verifier,
delay_between_blocks,
metrics.clone(),
)
.await;
result_sender.blocks_processed(res.imported, res.block_count, res.results);
}
}sourcefn justification_imported(
&mut self,
_who: RuntimeOrigin,
_hash: &B::Hash,
_number: NumberFor<B>,
_success: bool
)
fn justification_imported(
&mut self,
_who: RuntimeOrigin,
_hash: &B::Hash,
_number: NumberFor<B>,
_success: bool
)
Justification import result.
Examples found in repository?
More examples
src/import_queue/basic_queue.rs (line 376)
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377
async fn import_justification(
&mut self,
who: RuntimeOrigin,
hash: B::Hash,
number: NumberFor<B>,
justification: Justification,
) {
let started = std::time::Instant::now();
let success = match self.justification_import.as_mut() {
Some(justification_import) => justification_import
.import_justification(hash, number, justification)
.await
.map_err(|e| {
debug!(
target: LOG_TARGET,
"Justification import failed for hash = {:?} with number = {:?} coming from node = {:?} with error: {}",
hash,
number,
who,
e,
);
e
})
.is_ok(),
None => false,
};
if let Some(metrics) = self.metrics.as_ref() {
metrics.justification_import_time.observe(started.elapsed().as_secs_f64());
}
self.result_sender.justification_imported(who, &hash, number, success);
}sourcefn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor<B>)
fn request_justification(&mut self, _hash: &B::Hash, _number: NumberFor<B>)
Request a justification for the given block.
Examples found in repository?
More examples
src/import_queue/basic_queue.rs (line 292)
265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
fn new<V: 'static + Verifier<B>, Transaction: Send + 'static>(
result_sender: BufferedLinkSender<B>,
verifier: V,
block_import: BoxBlockImport<B, Transaction>,
justification_import: Option<BoxJustificationImport<B>>,
metrics: Option<Metrics>,
) -> (
impl Future<Output = ()> + Send,
TracingUnboundedSender<worker_messages::ImportJustification<B>>,
TracingUnboundedSender<worker_messages::ImportBlocks<B>>,
) {
use worker_messages::*;
let (justification_sender, mut justification_port) =
tracing_unbounded("mpsc_import_queue_worker_justification");
let (block_import_sender, block_import_port) =
tracing_unbounded("mpsc_import_queue_worker_blocks");
let mut worker = BlockImportWorker { result_sender, justification_import, metrics };
let delay_between_blocks = Duration::default();
let future = async move {
// Let's initialize `justification_import`
if let Some(justification_import) = worker.justification_import.as_mut() {
for (hash, number) in justification_import.on_start().await {
worker.result_sender.request_justification(&hash, number);
}
}
let block_import_process = block_import_process(
block_import,
verifier,
worker.result_sender.clone(),
block_import_port,
worker.metrics.clone(),
delay_between_blocks,
);
futures::pin_mut!(block_import_process);
loop {
// If the results sender is closed, that means that the import queue is shutting
// down and we should end this future.
if worker.result_sender.is_closed() {
log::debug!(
target: LOG_TARGET,
"Stopping block import because result channel was closed!",
);
return
}
// Make sure to first process all justifications
while let Poll::Ready(justification) = futures::poll!(justification_port.next()) {
match justification {
Some(ImportJustification(who, hash, number, justification)) =>
worker.import_justification(who, hash, number, justification).await,
None => {
log::debug!(
target: LOG_TARGET,
"Stopping block import because justification channel was closed!",
);
return
},
}
}
if let Poll::Ready(()) = futures::poll!(&mut block_import_process) {
return
}
// All futures that we polled are now pending.
futures::pending!()
}
};
(future, justification_sender, block_import_sender)
}