Skip to main content

pezsc_consensus_babe/
lib.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! # BABE (Blind Assignment for Blockchain Extension)
20//!
21//! BABE is a slot-based block production mechanism which uses a VRF PRNG to
22//! randomly perform the slot allocation. On every slot, all the authorities
23//! generate a new random number with the VRF function and if it is lower than a
24//! given threshold (which is proportional to their weight/stake) they have a
25//! right to produce a block. The proof of the VRF function execution will be
26//! used by other peer to validate the legitimacy of the slot claim.
27//!
28//! The engine is also responsible for collecting entropy on-chain which will be
29//! used to seed the given VRF PRNG. An epoch is a contiguous number of slots
30//! under which we will be using the same authority set. During an epoch all VRF
31//! outputs produced as a result of block production will be collected on an
32//! on-chain randomness pool. Epoch changes are announced one epoch in advance,
33//! i.e. when ending epoch N, we announce the parameters (randomness,
34//! authorities, etc.) for epoch N+2.
35//!
36//! Since the slot assignment is randomized, it is possible that a slot is
37//! assigned to multiple validators in which case we will have a temporary fork,
38//! or that a slot is assigned to no validator in which case no block is
39//! produced. Which means that block times are not deterministic.
40//!
41//! The protocol has a parameter `c` [0, 1] for which `1 - c` is the probability
42//! of a slot being empty. The choice of this parameter affects the security of
43//! the protocol relating to maximum tolerable network delays.
44//!
45//! In addition to the VRF-based slot assignment described above, which we will
46//! call primary slots, the engine also supports a deterministic secondary slot
47//! assignment. Primary slots take precedence over secondary slots, when
48//! authoring the node starts by trying to claim a primary slot and falls back
49//! to a secondary slot claim attempt. The secondary slot assignment is done
50//! by picking the authority at index:
51//!
52//! `blake2_256(epoch_randomness ++ slot_number) % authorities_len`.
53//!
54//! The secondary slots supports either a `SecondaryPlain` or `SecondaryVRF`
55//! variant. Comparing with `SecondaryPlain` variant, the `SecondaryVRF` variant
56//! generates an additional VRF output. The output is not included in beacon
57//! randomness, but can be consumed by teyrchains.
58//!
59//! The fork choice rule is weight-based, where weight equals the number of
60//! primary blocks in the chain. We will pick the heaviest chain (more primary
61//! blocks) and will go with the longest one in case of a tie.
62//!
63//! An in-depth description and analysis of the protocol can be found here:
64//! <https://research.web3.foundation/Polkadot/protocols/block-production/Babe>
65
66#![forbid(unsafe_code)]
67#![warn(missing_docs)]
68
69use std::{
70	collections::HashSet,
71	future::Future,
72	ops::{Deref, DerefMut},
73	pin::Pin,
74	sync::Arc,
75	task::{Context, Poll},
76	time::Duration,
77};
78
79use codec::{Decode, Encode};
80use futures::{
81	channel::{
82		mpsc::{channel, Receiver, Sender},
83		oneshot,
84	},
85	prelude::*,
86};
87use log::{debug, info, log, trace, warn};
88use parking_lot::Mutex;
89use prometheus_endpoint::Registry;
90
91use pezsc_client_api::{
92	backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
93	PreCommitActions, UsageProvider,
94};
95use pezsc_consensus::{
96	block_import::{
97		BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
98		StateAction,
99	},
100	import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
101};
102use pezsc_consensus_epochs::{
103	descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpoch,
104	ViableEpochDescriptor,
105};
106use pezsc_consensus_slots::{
107	check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
108	SlotInfo, StorageChanges,
109};
110use pezsc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
111use pezsc_transaction_pool_api::OffchainTransactionPoolFactory;
112use pezsp_api::{ApiExt, ProvideRuntimeApi};
113use pezsp_application_crypto::AppCrypto;
114use pezsp_block_builder::BlockBuilder as BlockBuilderApi;
115use pezsp_blockchain::{
116	Backend as _, BlockStatus, Error as ClientError, HeaderBackend, HeaderMetadata,
117	Result as ClientResult,
118};
119use pezsp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain};
120use pezsp_consensus_babe::{inherents::BabeInherentData, SlotDuration};
121use pezsp_consensus_slots::Slot;
122use pezsp_core::traits::SpawnEssentialNamed;
123use pezsp_inherents::{CreateInherentDataProviders, InherentDataProvider};
124use pezsp_keystore::KeystorePtr;
125use pezsp_runtime::{
126	generic::OpaqueDigestItemId,
127	traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
128	DigestItem,
129};
130
131pub use pezsc_consensus_slots::SlotProportion;
132pub use pezsp_consensus::SyncOracle;
133pub use pezsp_consensus_babe::{
134	digests::{
135		CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
136		PrimaryPreDigest, SecondaryPlainPreDigest,
137	},
138	AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
139	BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
140};
141
142pub use aux_schema::load_block_weight as block_weight;
143use pezsp_timestamp::Timestamp;
144
145mod migration;
146mod verification;
147
148pub mod authorship;
149pub mod aux_schema;
150#[cfg(test)]
151mod tests;
152
153const LOG_TARGET: &str = "babe";
154
155/// VRF context used for slots claiming lottery.
156const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"bizinikiwi-babe-vrf";
157
158/// VRF output length for slots claiming lottery.
159const AUTHORING_SCORE_LENGTH: usize = 16;
160
161/// BABE epoch information
162#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
163pub struct Epoch(pezsp_consensus_babe::Epoch);
164
165impl Deref for Epoch {
166	type Target = pezsp_consensus_babe::Epoch;
167
168	fn deref(&self) -> &Self::Target {
169		&self.0
170	}
171}
172
173impl DerefMut for Epoch {
174	fn deref_mut(&mut self) -> &mut Self::Target {
175		&mut self.0
176	}
177}
178
179impl From<pezsp_consensus_babe::Epoch> for Epoch {
180	fn from(epoch: pezsp_consensus_babe::Epoch) -> Self {
181		Epoch(epoch)
182	}
183}
184
185impl EpochT for Epoch {
186	type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
187	type Slot = Slot;
188
189	fn increment(
190		&self,
191		(descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
192	) -> Epoch {
193		pezsp_consensus_babe::Epoch {
194			epoch_index: self.epoch_index + 1,
195			start_slot: self.start_slot + self.duration,
196			duration: self.duration,
197			authorities: descriptor.authorities,
198			randomness: descriptor.randomness,
199			config,
200		}
201		.into()
202	}
203
204	fn start_slot(&self) -> Slot {
205		self.start_slot
206	}
207
208	fn end_slot(&self) -> Slot {
209		self.start_slot + self.duration
210	}
211}
212
213impl Epoch {
214	/// Create the genesis epoch (epoch #0).
215	///
216	/// This is defined to start at the slot of the first block, so that has to be provided.
217	pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
218		pezsp_consensus_babe::Epoch {
219			epoch_index: 0,
220			start_slot: slot,
221			duration: genesis_config.epoch_length,
222			authorities: genesis_config.authorities.clone(),
223			randomness: genesis_config.randomness,
224			config: BabeEpochConfiguration {
225				c: genesis_config.c,
226				allowed_slots: genesis_config.allowed_slots,
227			},
228		}
229		.into()
230	}
231
232	/// Clone and tweak epoch information to refer to the specified slot.
233	///
234	/// All the information which depends on the slot value is recomputed and assigned
235	/// to the returned epoch instance.
236	///
237	/// The `slot` must be greater than or equal the original epoch start slot,
238	/// if is less this operation is equivalent to a simple clone.
239	pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
240		let mut epoch = self.clone();
241
242		let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
243
244		let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
245			"epoch number is u64; it should be strictly smaller than number of slots; \
246				slots relate in some way to wall clock time; \
247				if u64 is not enough we should crash for safety; qed.",
248		);
249
250		let start_slot = skipped_epochs
251			.checked_mul(epoch.duration)
252			.and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
253			.expect(
254				"slot number is u64; it should relate in some way to wall clock time; \
255				 if u64 is not enough we should crash for safety; qed.",
256			);
257
258		epoch.epoch_index = epoch_index;
259		epoch.start_slot = Slot::from(start_slot);
260
261		epoch
262	}
263}
264
265/// Errors encountered by the babe authorship task.
266#[derive(Debug, thiserror::Error)]
267pub enum Error<B: BlockT> {
268	/// Multiple BABE pre-runtime digests
269	#[error("Multiple BABE pre-runtime digests, rejecting!")]
270	MultiplePreRuntimeDigests,
271	/// No BABE pre-runtime digest found
272	#[error("No BABE pre-runtime digest found")]
273	NoPreRuntimeDigest,
274	/// Multiple BABE epoch change digests
275	#[error("Multiple BABE epoch change digests, rejecting!")]
276	MultipleEpochChangeDigests,
277	/// Multiple BABE config change digests
278	#[error("Multiple BABE config change digests, rejecting!")]
279	MultipleConfigChangeDigests,
280	/// Could not extract timestamp and slot
281	#[error("Could not extract timestamp and slot: {0}")]
282	Extraction(ConsensusError),
283	/// Could not fetch epoch
284	#[error("Could not fetch epoch at {0:?}")]
285	FetchEpoch(B::Hash),
286	/// Header rejected: too far in the future
287	#[error("Header {0:?} rejected: too far in the future")]
288	TooFarInFuture(B::Hash),
289	/// Parent unavailable. Cannot import
290	#[error("Parent ({0}) of {1} unavailable. Cannot import")]
291	ParentUnavailable(B::Hash, B::Hash),
292	/// Slot number must increase
293	#[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
294	SlotMustIncrease(Slot, Slot),
295	/// Header has a bad seal
296	#[error("Header {0:?} has a bad seal")]
297	HeaderBadSeal(B::Hash),
298	/// Header is unsealed
299	#[error("Header {0:?} is unsealed")]
300	HeaderUnsealed(B::Hash),
301	/// Slot author not found
302	#[error("Slot author not found")]
303	SlotAuthorNotFound,
304	/// Secondary slot assignments are disabled for the current epoch.
305	#[error("Secondary slot assignments are disabled for the current epoch.")]
306	SecondarySlotAssignmentsDisabled,
307	/// Bad signature
308	#[error("Bad signature on {0:?}")]
309	BadSignature(B::Hash),
310	/// Invalid author: Expected secondary author
311	#[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
312	InvalidAuthor(AuthorityId, AuthorityId),
313	/// No secondary author expected.
314	#[error("No secondary author expected.")]
315	NoSecondaryAuthorExpected,
316	/// VRF verification failed
317	#[error("VRF verification failed")]
318	VrfVerificationFailed,
319	/// Primary slot threshold too low
320	#[error("VRF output rejected, threshold {0} exceeded")]
321	VrfThresholdExceeded(u128),
322	/// Could not fetch parent header
323	#[error("Could not fetch parent header: {0}")]
324	FetchParentHeader(pezsp_blockchain::Error),
325	/// Expected epoch change to happen.
326	#[error("Expected epoch change to happen at {0:?}, s{1}")]
327	ExpectedEpochChange(B::Hash, Slot),
328	/// Unexpected config change.
329	#[error("Unexpected config change")]
330	UnexpectedConfigChange,
331	/// Unexpected epoch change
332	#[error("Unexpected epoch change")]
333	UnexpectedEpochChange,
334	/// Parent block has no associated weight
335	#[error("Parent block of {0} has no associated weight")]
336	ParentBlockNoAssociatedWeight(B::Hash),
337	/// Check inherents error
338	#[error("Checking inherents failed: {0}")]
339	CheckInherents(pezsp_inherents::Error),
340	/// Unhandled check inherents error
341	#[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
342	CheckInherentsUnhandled(pezsp_inherents::InherentIdentifier),
343	/// Create inherents error.
344	#[error("Creating inherents failed: {0}")]
345	CreateInherents(pezsp_inherents::Error),
346	/// Background worker is not running and therefore requests cannot be answered.
347	#[error("Background worker is not running")]
348	BackgroundWorkerTerminated,
349	/// Client error
350	#[error(transparent)]
351	Client(pezsp_blockchain::Error),
352	/// Runtime Api error.
353	#[error(transparent)]
354	RuntimeApi(pezsp_api::ApiError),
355	/// Fork tree error
356	#[error(transparent)]
357	ForkTree(Box<pez_fork_tree::Error<pezsp_blockchain::Error>>),
358}
359
360impl<B: BlockT> From<Error<B>> for String {
361	fn from(error: Error<B>) -> String {
362		error.to_string()
363	}
364}
365
366fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
367	debug!(target: LOG_TARGET, "{}", error);
368	error
369}
370
371/// Intermediate value passed to block importer.
372pub struct BabeIntermediate<B: BlockT> {
373	/// The epoch descriptor.
374	pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
375}
376
377/// Intermediate key for Babe engine.
378pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
379
380/// Read configuration from the runtime state at current best block.
381pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
382where
383	C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
384	C::Api: BabeApi<B>,
385{
386	let at_hash = if client.usage_info().chain.finalized_state.is_some() {
387		client.usage_info().chain.best_hash
388	} else {
389		debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
390		client.usage_info().chain.genesis_hash
391	};
392
393	let runtime_api = client.runtime_api();
394	let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
395
396	let config = match version {
397		Some(1) => {
398			#[allow(deprecated)]
399			{
400				runtime_api.configuration_before_version_2(at_hash)?.into()
401			}
402		},
403		Some(2) => runtime_api.configuration(at_hash)?,
404		_ => {
405			return Err(pezsp_blockchain::Error::VersionInvalid(
406				"Unsupported or invalid BabeApi version".to_string(),
407			))
408		},
409	};
410	Ok(config)
411}
412
413/// Parameters for BABE.
414pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
415	/// The keystore that manages the keys of the node.
416	pub keystore: KeystorePtr,
417
418	/// The client to use
419	pub client: Arc<C>,
420
421	/// The SelectChain Strategy
422	pub select_chain: SC,
423
424	/// The environment we are producing blocks for.
425	pub env: E,
426
427	/// The underlying block-import object to supply our produced blocks to.
428	/// This must be a `BabeBlockImport` or a wrapper of it, otherwise
429	/// critical consensus logic will be omitted.
430	pub block_import: I,
431
432	/// A sync oracle
433	pub sync_oracle: SO,
434
435	/// Hook into the sync module to control the justification sync process.
436	pub justification_sync_link: L,
437
438	/// Something that can create the inherent data providers.
439	pub create_inherent_data_providers: CIDP,
440
441	/// Force authoring of blocks even if we are offline
442	pub force_authoring: bool,
443
444	/// Strategy and parameters for backing off block production.
445	pub backoff_authoring_blocks: Option<BS>,
446
447	/// The source of timestamps for relative slots
448	pub babe_link: BabeLink<B>,
449
450	/// The proportion of the slot dedicated to proposing.
451	///
452	/// The block proposing will be limited to this proportion of the slot from the starting of the
453	/// slot. However, the proposing can still take longer when there is some lenience factor
454	/// applied, because there were no blocks produced for some slots.
455	pub block_proposal_slot_portion: SlotProportion,
456
457	/// The maximum proportion of the slot dedicated to proposing with any lenience factor applied
458	/// due to no blocks being produced.
459	pub max_block_proposal_slot_portion: Option<SlotProportion>,
460
461	/// Handle use to report telemetries.
462	pub telemetry: Option<TelemetryHandle>,
463}
464
465/// Start the babe worker.
466pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
467	BabeParams {
468		keystore,
469		client,
470		select_chain,
471		env,
472		block_import,
473		sync_oracle,
474		justification_sync_link,
475		create_inherent_data_providers,
476		force_authoring,
477		backoff_authoring_blocks,
478		babe_link,
479		block_proposal_slot_portion,
480		max_block_proposal_slot_portion,
481		telemetry,
482	}: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
483) -> Result<BabeWorker<B>, ConsensusError>
484where
485	B: BlockT,
486	C: ProvideRuntimeApi<B>
487		+ HeaderBackend<B>
488		+ HeaderMetadata<B, Error = ClientError>
489		+ Send
490		+ Sync
491		+ 'static,
492	C::Api: BabeApi<B>,
493	SC: SelectChain<B> + 'static,
494	E: Environment<B, Error = Error> + Send + Sync + 'static,
495	E::Proposer: Proposer<B, Error = Error>,
496	I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
497	SO: SyncOracle + Send + Sync + Clone + 'static,
498	L: pezsc_consensus::JustificationSyncLink<B> + 'static,
499	CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
500	CIDP::InherentDataProviders: InherentDataProviderExt + Send,
501	BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
502	Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
503{
504	let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
505
506	let worker = BabeSlotWorker {
507		client: client.clone(),
508		block_import,
509		env,
510		sync_oracle: sync_oracle.clone(),
511		justification_sync_link,
512		force_authoring,
513		backoff_authoring_blocks,
514		keystore,
515		epoch_changes: babe_link.epoch_changes.clone(),
516		slot_notification_sinks: slot_notification_sinks.clone(),
517		config: babe_link.config.clone(),
518		block_proposal_slot_portion,
519		max_block_proposal_slot_portion,
520		telemetry,
521	};
522
523	info!(target: LOG_TARGET, "👶 Starting BABE Authorship worker");
524
525	let slot_worker = pezsc_consensus_slots::start_slot_worker(
526		babe_link.config.slot_duration(),
527		select_chain,
528		pezsc_consensus_slots::SimpleSlotWorkerToSlotWorker(worker),
529		sync_oracle,
530		create_inherent_data_providers,
531	);
532
533	Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
534}
535
536// Remove obsolete block's weight data by leveraging finality notifications.
537// This includes data for all finalized blocks (excluding the most recent one)
538// and all stale branches.
539fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
540	client: &C,
541	notification: &FinalityNotification<Block>,
542) -> AuxDataOperations {
543	let mut hashes = HashSet::new();
544
545	let first = notification.tree_route.first().unwrap_or(&notification.hash);
546	match client.header_metadata(*first) {
547		Ok(meta) => {
548			hashes.insert(meta.parent);
549		},
550		Err(err) => {
551			warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
552		},
553	}
554
555	// Cleans data for finalized block's ancestors
556	hashes.extend(
557		notification
558			.tree_route
559			.iter()
560			// Ensure we don't prune latest finalized block.
561			// This should not happen, but better be safe than sorry!
562			.filter(|h| **h != notification.hash),
563	);
564
565	hashes.extend(notification.stale_blocks.iter().map(|b| b.hash));
566
567	hashes
568		.into_iter()
569		.map(|val| (aux_schema::block_weight_key(val), None))
570		.collect()
571}
572
573async fn answer_requests<B: BlockT, C>(
574	mut request_rx: Receiver<BabeRequest<B>>,
575	config: BabeConfiguration,
576	client: Arc<C>,
577	epoch_changes: SharedEpochChanges<B, Epoch>,
578) where
579	C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
580{
581	while let Some(request) = request_rx.next().await {
582		match request {
583			BabeRequest::EpochData(response) => {
584				let _ = response.send(epoch_changes.shared_data().clone());
585			},
586			BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
587				let lookup = || {
588					let epoch_changes = epoch_changes.shared_data();
589					epoch_changes
590						.epoch_data_for_child_of(
591							descendent_query(&*client),
592							&parent_hash,
593							parent_number,
594							slot,
595							|slot| Epoch::genesis(&config, slot),
596						)
597						.map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
598						.ok_or(Error::<B>::FetchEpoch(parent_hash))
599				};
600
601				let _ = response.send(lookup());
602			},
603		}
604	}
605}
606
607/// Requests to the BABE service.
608enum BabeRequest<B: BlockT> {
609	/// Request all available epoch data.
610	EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
611	/// Request the epoch that a child of the given block, with the given slot number would have.
612	///
613	/// The parent block is identified by its hash and number.
614	EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
615}
616
617/// A handle to the BABE worker for issuing requests.
618#[derive(Clone)]
619pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
620
621impl<B: BlockT> BabeWorkerHandle<B> {
622	async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
623		match self.0.clone().send(request).await {
624			Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
625			Err(err) => warn!(
626				target: LOG_TARGET,
627				"Unhandled error when sending request to worker: {:?}", err
628			),
629			_ => {},
630		}
631
632		Ok(())
633	}
634
635	/// Fetch all available epoch data.
636	pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
637		let (tx, rx) = oneshot::channel();
638		self.send_request(BabeRequest::EpochData(tx)).await?;
639
640		rx.await.or(Err(Error::BackgroundWorkerTerminated))
641	}
642
643	/// Fetch the epoch that a child of the given block, with the given slot number would have.
644	///
645	/// The parent block is identified by its hash and number.
646	pub async fn epoch_data_for_child_of(
647		&self,
648		parent_hash: B::Hash,
649		parent_number: NumberFor<B>,
650		slot: Slot,
651	) -> Result<Epoch, Error<B>> {
652		let (tx, rx) = oneshot::channel();
653		self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
654			.await?;
655
656		rx.await.or(Err(Error::BackgroundWorkerTerminated))?
657	}
658}
659
660/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
661#[must_use]
662pub struct BabeWorker<B: BlockT> {
663	inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
664	slot_notification_sinks: SlotNotificationSinks<B>,
665}
666
667impl<B: BlockT> BabeWorker<B> {
668	/// Return an event stream of notifications for when new slot happens, and the corresponding
669	/// epoch descriptor.
670	pub fn slot_notification_stream(
671		&self,
672	) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
673		const CHANNEL_BUFFER_SIZE: usize = 1024;
674
675		let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
676		self.slot_notification_sinks.lock().push(sink);
677		stream
678	}
679}
680
681impl<B: BlockT> Future for BabeWorker<B> {
682	type Output = ();
683
684	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
685		self.inner.as_mut().poll(cx)
686	}
687}
688
689/// Slot notification sinks.
690type SlotNotificationSinks<B> = Arc<
691	Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
692>;
693
694struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
695	client: Arc<C>,
696	block_import: I,
697	env: E,
698	sync_oracle: SO,
699	justification_sync_link: L,
700	force_authoring: bool,
701	backoff_authoring_blocks: Option<BS>,
702	keystore: KeystorePtr,
703	epoch_changes: SharedEpochChanges<B, Epoch>,
704	slot_notification_sinks: SlotNotificationSinks<B>,
705	config: BabeConfiguration,
706	block_proposal_slot_portion: SlotProportion,
707	max_block_proposal_slot_portion: Option<SlotProportion>,
708	telemetry: Option<TelemetryHandle>,
709}
710
711#[async_trait::async_trait]
712impl<B, C, E, I, Error, SO, L, BS> pezsc_consensus_slots::SimpleSlotWorker<B>
713	for BabeSlotWorker<B, C, E, I, SO, L, BS>
714where
715	B: BlockT,
716	C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
717	C::Api: BabeApi<B>,
718	E: Environment<B, Error = Error> + Send + Sync,
719	E::Proposer: Proposer<B, Error = Error>,
720	I: BlockImport<B> + Send + Sync + 'static,
721	SO: SyncOracle + Send + Clone + Sync,
722	L: pezsc_consensus::JustificationSyncLink<B>,
723	BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
724	Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
725{
726	type Claim = (PreDigest, AuthorityId);
727	type SyncOracle = SO;
728	type JustificationSyncLink = L;
729	type CreateProposer =
730		Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
731	type Proposer = E::Proposer;
732	type BlockImport = I;
733	type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
734
735	fn logging_target(&self) -> &'static str {
736		LOG_TARGET
737	}
738
739	fn block_import(&mut self) -> &mut Self::BlockImport {
740		&mut self.block_import
741	}
742
743	fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
744		self.epoch_changes
745			.shared_data()
746			.epoch_descriptor_for_child_of(
747				descendent_query(&*self.client),
748				&parent.hash(),
749				*parent.number(),
750				slot,
751			)
752			.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
753			.ok_or(ConsensusError::InvalidAuthoritiesSet)
754	}
755
756	fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
757		self.epoch_changes
758			.shared_data()
759			.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
760			.map(|epoch| epoch.as_ref().authorities.len())
761	}
762
763	async fn claim_slot(
764		&mut self,
765		_parent_header: &B::Header,
766		slot: Slot,
767		epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
768	) -> Option<Self::Claim> {
769		debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
770		let s = authorship::claim_slot(
771			slot,
772			self.epoch_changes
773				.shared_data()
774				.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
775				.as_ref(),
776			&self.keystore,
777		);
778
779		if s.is_some() {
780			debug!(target: LOG_TARGET, "Claimed slot {}", slot);
781		}
782
783		s
784	}
785
786	fn notify_slot(
787		&self,
788		_parent_header: &B::Header,
789		slot: Slot,
790		epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
791	) {
792		let sinks = &mut self.slot_notification_sinks.lock();
793		sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
794			Ok(()) => true,
795			Err(e) => {
796				if e.is_full() {
797					warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
798					true
799				} else {
800					false
801				}
802			},
803		});
804	}
805
806	fn pre_digest_data(&self, _slot: Slot, claim: &Self::Claim) -> Vec<pezsp_runtime::DigestItem> {
807		vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
808	}
809
810	async fn block_import_params(
811		&self,
812		header: B::Header,
813		header_hash: &B::Hash,
814		body: Vec<B::Extrinsic>,
815		storage_changes: StorageChanges<B>,
816		(_, public): Self::Claim,
817		epoch_descriptor: Self::AuxData,
818	) -> Result<BlockImportParams<B>, ConsensusError> {
819		let signature = self
820			.keystore
821			.sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
822			.map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
823			.ok_or_else(|| {
824				ConsensusError::CannotSign(format!(
825					"Could not find key in keystore. Key: {:?}",
826					public
827				))
828			})?;
829
830		let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
831
832		let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
833		import_block.post_digests.push(digest_item);
834		import_block.body = Some(body);
835		import_block.state_action =
836			StateAction::ApplyChanges(pezsc_consensus::StorageChanges::Changes(storage_changes));
837		import_block
838			.insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
839
840		Ok(import_block)
841	}
842
843	fn force_authoring(&self) -> bool {
844		self.force_authoring
845	}
846
847	fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
848		if let Some(ref strategy) = self.backoff_authoring_blocks {
849			if let Ok(chain_head_slot) =
850				find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
851			{
852				return strategy.should_backoff(
853					*chain_head.number(),
854					chain_head_slot,
855					self.client.info().finalized_number,
856					slot,
857					self.logging_target(),
858				);
859			}
860		}
861		false
862	}
863
864	fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
865		&mut self.sync_oracle
866	}
867
868	fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
869		&mut self.justification_sync_link
870	}
871
872	fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
873		Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
874	}
875
876	fn telemetry(&self) -> Option<TelemetryHandle> {
877		self.telemetry.clone()
878	}
879
880	fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
881		let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
882
883		pezsc_consensus_slots::proposing_remaining_duration(
884			parent_slot,
885			slot_info,
886			&self.block_proposal_slot_portion,
887			self.max_block_proposal_slot_portion.as_ref(),
888			pezsc_consensus_slots::SlotLenienceType::Exponential,
889			self.logging_target(),
890		)
891	}
892}
893
894/// Extract the BABE pre digest from the given header. Pre-runtime digests are
895/// mandatory, the function will return `Err` if none is found.
896pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
897	// genesis block doesn't contain a pre digest so let's generate a
898	// dummy one to not break any invariants in the rest of the code
899	if header.number().is_zero() {
900		return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
901			slot: 0.into(),
902			authority_index: 0,
903		}));
904	}
905
906	let mut pre_digest: Option<_> = None;
907	for log in header.digest().logs() {
908		trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
909		match (log.as_babe_pre_digest(), pre_digest.is_some()) {
910			(Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
911			(None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
912			(s, false) => pre_digest = s,
913		}
914	}
915	pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
916}
917
918/// Check whether the given header contains a BABE epoch change digest.
919pub fn contains_epoch_change<B: BlockT>(header: &B::Header) -> bool {
920	find_next_epoch_digest::<B>(header).ok().flatten().is_some()
921}
922
923/// Extract the BABE epoch change digest from the given header, if it exists.
924pub fn find_next_epoch_digest<B: BlockT>(
925	header: &B::Header,
926) -> Result<Option<NextEpochDescriptor>, Error<B>> {
927	let mut epoch_digest: Option<_> = None;
928	for log in header.digest().logs() {
929		trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
930		let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
931		match (log, epoch_digest.is_some()) {
932			(Some(ConsensusLog::NextEpochData(_)), true) => {
933				return Err(babe_err(Error::MultipleEpochChangeDigests))
934			},
935			(Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
936			_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
937		}
938	}
939
940	Ok(epoch_digest)
941}
942
943/// Extract the BABE config change digest from the given header, if it exists.
944fn find_next_config_digest<B: BlockT>(
945	header: &B::Header,
946) -> Result<Option<NextConfigDescriptor>, Error<B>> {
947	let mut config_digest: Option<_> = None;
948	for log in header.digest().logs() {
949		trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
950		let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
951		match (log, config_digest.is_some()) {
952			(Some(ConsensusLog::NextConfigData(_)), true) => {
953				return Err(babe_err(Error::MultipleConfigChangeDigests))
954			},
955			(Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
956			_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
957		}
958	}
959
960	Ok(config_digest)
961}
962
963/// State that must be shared between the import queue and the authoring logic.
964#[derive(Clone)]
965pub struct BabeLink<Block: BlockT> {
966	epoch_changes: SharedEpochChanges<Block, Epoch>,
967	config: BabeConfiguration,
968}
969
970impl<Block: BlockT> BabeLink<Block> {
971	/// Get the epoch changes of this link.
972	pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
973		&self.epoch_changes
974	}
975
976	/// Get the config of this link.
977	pub fn config(&self) -> &BabeConfiguration {
978		&self.config
979	}
980}
981
982/// A verifier for Babe blocks.
983pub struct BabeVerifier<Block: BlockT, Client> {
984	client: Arc<Client>,
985	slot_duration: SlotDuration,
986	config: BabeConfiguration,
987	epoch_changes: SharedEpochChanges<Block, Epoch>,
988	telemetry: Option<TelemetryHandle>,
989}
990
991#[async_trait::async_trait]
992impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client>
993where
994	Block: BlockT,
995	Client: HeaderMetadata<Block, Error = pezsp_blockchain::Error>
996		+ HeaderBackend<Block>
997		+ ProvideRuntimeApi<Block>
998		+ Send
999		+ Sync
1000		+ AuxStore,
1001	Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
1002{
1003	async fn verify(
1004		&self,
1005		mut block: BlockImportParams<Block>,
1006	) -> Result<BlockImportParams<Block>, String> {
1007		trace!(
1008			target: LOG_TARGET,
1009			"Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
1010			block.origin,
1011			block.header,
1012			block.justifications,
1013			block.body,
1014		);
1015
1016		let hash = block.header.hash();
1017		let parent_hash = *block.header.parent_hash();
1018
1019		let number = block.header.number();
1020
1021		if is_state_sync_or_gap_sync_import(&*self.client, &block) {
1022			return Ok(block);
1023		}
1024
1025		debug!(
1026			target: LOG_TARGET,
1027			"We have {:?} logs in this header",
1028			block.header.digest().logs().len()
1029		);
1030
1031		let slot_now = Slot::from_timestamp(Timestamp::current(), self.slot_duration);
1032
1033		let pre_digest = find_pre_digest::<Block>(&block.header)?;
1034		let (check_header, epoch_descriptor) = {
1035			let (epoch_descriptor, viable_epoch) = query_epoch_changes(
1036				&self.epoch_changes,
1037				self.client.as_ref(),
1038				&self.config,
1039				*number,
1040				pre_digest.slot(),
1041				parent_hash,
1042			)?;
1043
1044			// We add one to the current slot to allow for some small drift.
1045			// FIXME #1019 in the future, alter this queue to allow deferring of headers
1046			let v_params = verification::VerificationParams {
1047				header: block.header.clone(),
1048				pre_digest: Some(pre_digest),
1049				slot_now: slot_now + 1,
1050				epoch: viable_epoch.as_ref(),
1051			};
1052
1053			(verification::check_header::<Block>(v_params)?, epoch_descriptor)
1054		};
1055
1056		match check_header {
1057			CheckedHeader::Checked(pre_header, verified_info) => {
1058				trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
1059				telemetry!(
1060					self.telemetry;
1061					CONSENSUS_TRACE;
1062					"babe.checked_and_importing";
1063					"pre_header" => ?pre_header,
1064				);
1065
1066				block.header = pre_header;
1067				block.post_digests.push(verified_info.seal);
1068				block.insert_intermediate(
1069					INTERMEDIATE_KEY,
1070					BabeIntermediate::<Block> { epoch_descriptor },
1071				);
1072				block.post_hash = Some(hash);
1073
1074				Ok(block)
1075			},
1076			CheckedHeader::Deferred(a, b) => {
1077				debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
1078				telemetry!(
1079					self.telemetry;
1080					CONSENSUS_DEBUG;
1081					"babe.header_too_far_in_future";
1082					"hash" => ?hash, "a" => ?a, "b" => ?b
1083				);
1084				Err(Error::<Block>::TooFarInFuture(hash).into())
1085			},
1086		}
1087	}
1088}
1089
1090/// Verification for imported blocks is skipped in two cases:
1091/// 1. When importing blocks below the last finalized block during network initial synchronization.
1092/// 2. When importing whole state we don't calculate epoch descriptor, but rather read it from the
1093///    state after import. We also skip all verifications because there's no parent state and we
1094///    trust the sync module to verify that the state is correct and finalized.
1095fn is_state_sync_or_gap_sync_import<B: BlockT>(
1096	client: &impl HeaderBackend<B>,
1097	block: &BlockImportParams<B>,
1098) -> bool {
1099	let number = *block.header.number();
1100	let info = client.info();
1101	info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end)
1102		|| block.with_state()
1103}
1104
1105/// A block-import handler for BABE.
1106///
1107/// This scans each imported block for epoch change signals. The signals are
1108/// tracked in a tree (of all forks), and the import logic validates all epoch
1109/// change transitions, i.e. whether a given epoch change is expected or whether
1110/// it is missing.
1111///
1112/// The epoch change tree should be pruned as blocks are finalized.
1113pub struct BabeBlockImport<Block: BlockT, Client, I, CIDP, SC> {
1114	inner: I,
1115	client: Arc<Client>,
1116	epoch_changes: SharedEpochChanges<Block, Epoch>,
1117	create_inherent_data_providers: CIDP,
1118	config: BabeConfiguration,
1119	// A [`SelectChain`] implementation.
1120	//
1121	// Used to determine the best block that should be used as basis when sending an equivocation
1122	// report.
1123	select_chain: SC,
1124	// The offchain transaction pool factory.
1125	//
1126	// Will be used when sending equivocation reports.
1127	offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1128}
1129
1130impl<Block: BlockT, I: Clone, Client, CIDP: Clone, SC: Clone> Clone
1131	for BabeBlockImport<Block, Client, I, CIDP, SC>
1132{
1133	fn clone(&self) -> Self {
1134		BabeBlockImport {
1135			inner: self.inner.clone(),
1136			client: self.client.clone(),
1137			epoch_changes: self.epoch_changes.clone(),
1138			config: self.config.clone(),
1139			create_inherent_data_providers: self.create_inherent_data_providers.clone(),
1140			select_chain: self.select_chain.clone(),
1141			offchain_tx_pool_factory: self.offchain_tx_pool_factory.clone(),
1142		}
1143	}
1144}
1145
1146impl<Block: BlockT, Client, I, CIDP, SC> BabeBlockImport<Block, Client, I, CIDP, SC> {
1147	fn new(
1148		client: Arc<Client>,
1149		epoch_changes: SharedEpochChanges<Block, Epoch>,
1150		block_import: I,
1151		config: BabeConfiguration,
1152		create_inherent_data_providers: CIDP,
1153		select_chain: SC,
1154		offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1155	) -> Self {
1156		BabeBlockImport {
1157			client,
1158			inner: block_import,
1159			epoch_changes,
1160			config,
1161			create_inherent_data_providers,
1162			select_chain,
1163			offchain_tx_pool_factory,
1164		}
1165	}
1166}
1167
1168impl<Block, Client, Inner, CIDP, SC> BabeBlockImport<Block, Client, Inner, CIDP, SC>
1169where
1170	Block: BlockT,
1171	Inner: BlockImport<Block> + Send + Sync,
1172	Inner::Error: Into<ConsensusError>,
1173	Client: HeaderBackend<Block>
1174		+ HeaderMetadata<Block, Error = pezsp_blockchain::Error>
1175		+ AuxStore
1176		+ ProvideRuntimeApi<Block>
1177		+ Send
1178		+ Sync,
1179	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1180	CIDP: CreateInherentDataProviders<Block, ()>,
1181	CIDP::InherentDataProviders: InherentDataProviderExt + Send,
1182	SC: pezsp_consensus::SelectChain<Block> + 'static,
1183{
1184	/// Import whole state after warp sync.
1185	// This function makes multiple transactions to the DB. If one of them fails we may
1186	// end up in an inconsistent state and have to resync.
1187	async fn import_state(
1188		&self,
1189		mut block: BlockImportParams<Block>,
1190	) -> Result<ImportResult, ConsensusError> {
1191		let hash = block.post_hash();
1192		let parent_hash = *block.header.parent_hash();
1193		let number = *block.header.number();
1194
1195		block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
1196		// Reset block weight.
1197		aux_schema::write_block_weight(hash, 0, |values| {
1198			block
1199				.auxiliary
1200				.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1201		});
1202
1203		// First make the client import the state.
1204		let import_result = self.inner.import_block(block).await;
1205		let aux = match import_result {
1206			Ok(ImportResult::Imported(aux)) => aux,
1207			Ok(r) => {
1208				return Err(ConsensusError::ClientImport(format!(
1209					"Unexpected import result: {:?}",
1210					r
1211				)))
1212			},
1213			Err(r) => return Err(r.into()),
1214		};
1215
1216		// Read epoch info from the imported state.
1217		let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
1218			ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1219		})?;
1220		let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
1221			ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1222		})?;
1223
1224		let mut epoch_changes = self.epoch_changes.shared_data_locked();
1225		epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
1226		aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1227			self.client.insert_aux(insert, [])
1228		})
1229		.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1230
1231		Ok(ImportResult::Imported(aux))
1232	}
1233
1234	/// Check the inherents and equivocations.
1235	async fn check_inherents_and_equivocations(
1236		&self,
1237		block: &mut BlockImportParams<Block>,
1238	) -> Result<(), ConsensusError> {
1239		if is_state_sync_or_gap_sync_import(&*self.client, block) {
1240			return Ok(());
1241		}
1242
1243		let parent_hash = *block.header.parent_hash();
1244		let number = *block.header.number();
1245
1246		let create_inherent_data_providers = self
1247			.create_inherent_data_providers
1248			.create_inherent_data_providers(parent_hash, ())
1249			.await?;
1250
1251		let slot_now = create_inherent_data_providers.slot();
1252
1253		let babe_pre_digest = find_pre_digest::<Block>(&block.header)
1254			.map_err(|e| ConsensusError::Other(Box::new(e)))?;
1255		let slot = babe_pre_digest.slot();
1256
1257		// Check inherents.
1258		self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
1259			.await?;
1260
1261		// Check for equivocation and report it to the runtime if needed.
1262		let author = {
1263			let viable_epoch = query_epoch_changes(
1264				&self.epoch_changes,
1265				self.client.as_ref(),
1266				&self.config,
1267				number,
1268				slot,
1269				parent_hash,
1270			)
1271			.map_err(|e| ConsensusError::Other(babe_err(e).into()))?
1272			.1;
1273			match viable_epoch
1274				.as_ref()
1275				.authorities
1276				.get(babe_pre_digest.authority_index() as usize)
1277			{
1278				Some(author) => author.0.clone(),
1279				None => {
1280					return Err(ConsensusError::Other(Error::<Block>::SlotAuthorNotFound.into()))
1281				},
1282			}
1283		};
1284		if let Err(err) = self
1285			.check_and_report_equivocation(slot_now, slot, &block.header, &author, &block.origin)
1286			.await
1287		{
1288			warn!(
1289				target: LOG_TARGET,
1290				"Error checking/reporting BABE equivocation: {}", err
1291			);
1292		}
1293		Ok(())
1294	}
1295
1296	async fn check_inherents(
1297		&self,
1298		block: &mut BlockImportParams<Block>,
1299		at_hash: Block::Hash,
1300		slot: Slot,
1301		create_inherent_data_providers: CIDP::InherentDataProviders,
1302	) -> Result<(), ConsensusError> {
1303		if block.state_action.skip_execution_checks() {
1304			return Ok(());
1305		}
1306
1307		if let Some(inner_body) = block.body.take() {
1308			let new_block = Block::new(block.header.clone(), inner_body);
1309			// if the body is passed through and the block was executed,
1310			// we need to use the runtime to check that the internally-set
1311			// timestamp in the inherents actually matches the slot set in the seal.
1312			let mut inherent_data = create_inherent_data_providers
1313				.create_inherent_data()
1314				.await
1315				.map_err(|e| ConsensusError::Other(Box::new(e)))?;
1316			inherent_data.babe_replace_inherent_data(slot);
1317
1318			use pezsp_block_builder::CheckInherentsError;
1319
1320			pezsp_block_builder::check_inherents_with_data(
1321				self.client.clone(),
1322				at_hash,
1323				new_block.clone(),
1324				&create_inherent_data_providers,
1325				inherent_data,
1326			)
1327			.await
1328			.map_err(|e| {
1329				ConsensusError::Other(Box::new(match e {
1330					CheckInherentsError::CreateInherentData(e) => {
1331						Error::<Block>::CreateInherents(e)
1332					},
1333					CheckInherentsError::Client(e) => Error::RuntimeApi(e),
1334					CheckInherentsError::CheckInherents(e) => Error::CheckInherents(e),
1335					CheckInherentsError::CheckInherentsUnknownError(id) => {
1336						Error::CheckInherentsUnhandled(id)
1337					},
1338				}))
1339			})?;
1340			let (_, inner_body) = new_block.deconstruct();
1341			block.body = Some(inner_body);
1342		}
1343
1344		Ok(())
1345	}
1346
1347	async fn check_and_report_equivocation(
1348		&self,
1349		slot_now: Slot,
1350		slot: Slot,
1351		header: &Block::Header,
1352		author: &AuthorityId,
1353		origin: &BlockOrigin,
1354	) -> Result<(), Error<Block>> {
1355		// don't report any equivocations during initial sync
1356		// as they are most likely stale.
1357		if *origin == BlockOrigin::NetworkInitialSync {
1358			return Ok(());
1359		}
1360
1361		// check if authorship of this header is an equivocation and return a proof if so.
1362		let Some(equivocation_proof) =
1363			check_equivocation(&*self.client, slot_now, slot, header, author)
1364				.map_err(Error::Client)?
1365		else {
1366			return Ok(());
1367		};
1368
1369		info!(
1370			"Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
1371			author,
1372			slot,
1373			equivocation_proof.first_header.hash(),
1374			equivocation_proof.second_header.hash(),
1375		);
1376
1377		// get the best block on which we will build and send the equivocation report.
1378		let best_hash = self
1379			.select_chain
1380			.best_chain()
1381			.await
1382			.map(|h| h.hash())
1383			.map_err(|e| Error::Client(e.into()))?;
1384
1385		// generate a key ownership proof. we start by trying to generate the
1386		// key ownership proof at the parent of the equivocating header, this
1387		// will make sure that proof generation is successful since it happens
1388		// during the on-going session (i.e. session keys are available in the
1389		// state to be able to generate the proof). this might fail if the
1390		// equivocation happens on the first block of the session, in which case
1391		// its parent would be on the previous session. if generation on the
1392		// parent header fails we try with best block as well.
1393		let generate_key_owner_proof = |at_hash: Block::Hash| {
1394			self.client
1395				.runtime_api()
1396				.generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
1397				.map_err(Error::RuntimeApi)
1398		};
1399
1400		let parent_hash = *header.parent_hash();
1401		let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
1402			Some(proof) => proof,
1403			None => match generate_key_owner_proof(best_hash)? {
1404				Some(proof) => proof,
1405				None => {
1406					debug!(
1407						target: LOG_TARGET,
1408						"Equivocation offender is not part of the authority set."
1409					);
1410					return Ok(());
1411				},
1412			},
1413		};
1414
1415		// submit equivocation report at best block.
1416		let mut runtime_api = self.client.runtime_api();
1417
1418		// Register the offchain tx pool to be able to use it from the runtime.
1419		runtime_api
1420			.register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
1421
1422		runtime_api
1423			.submit_report_equivocation_unsigned_extrinsic(
1424				best_hash,
1425				equivocation_proof,
1426				key_owner_proof,
1427			)
1428			.map_err(Error::RuntimeApi)?;
1429
1430		info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
1431
1432		Ok(())
1433	}
1434}
1435
1436#[async_trait::async_trait]
1437impl<Block, Client, Inner, CIDP, SC> BlockImport<Block>
1438	for BabeBlockImport<Block, Client, Inner, CIDP, SC>
1439where
1440	Block: BlockT,
1441	Inner: BlockImport<Block> + Send + Sync,
1442	Inner::Error: Into<ConsensusError>,
1443	Client: HeaderBackend<Block>
1444		+ HeaderMetadata<Block, Error = pezsp_blockchain::Error>
1445		+ AuxStore
1446		+ ProvideRuntimeApi<Block>
1447		+ Send
1448		+ Sync,
1449	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1450	CIDP: CreateInherentDataProviders<Block, ()>,
1451	CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
1452	SC: SelectChain<Block> + 'static,
1453{
1454	type Error = ConsensusError;
1455
1456	async fn import_block(
1457		&self,
1458		mut block: BlockImportParams<Block>,
1459	) -> Result<ImportResult, Self::Error> {
1460		let hash = block.post_hash();
1461		let number = *block.header.number();
1462		let info = self.client.info();
1463
1464		self.check_inherents_and_equivocations(&mut block).await?;
1465
1466		let block_status = self
1467			.client
1468			.status(hash)
1469			.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1470
1471		// Skip babe logic if block already in chain or importing blocks during initial sync,
1472		// otherwise the check for epoch changes will error because trying to re-import an
1473		// epoch change or because of missing epoch data in the tree, respectively.
1474		if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end)
1475			|| block_status == BlockStatus::InChain
1476		{
1477			// When re-importing existing block strip away intermediates.
1478			// In case of initial sync intermediates should not be present...
1479			let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
1480			block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1481			return self.inner.import_block(block).await.map_err(Into::into);
1482		}
1483
1484		if block.with_state() {
1485			return self.import_state(block).await;
1486		}
1487
1488		let pre_digest = find_pre_digest::<Block>(&block.header).expect(
1489			"valid babe headers must contain a predigest; header has been already verified; qed",
1490		);
1491		let slot = pre_digest.slot();
1492
1493		let parent_hash = *block.header.parent_hash();
1494		let parent_header = self
1495			.client
1496			.header(parent_hash)
1497			.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1498			.ok_or_else(|| {
1499				ConsensusError::ChainLookup(
1500					babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
1501				)
1502			})?;
1503
1504		let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
1505			"parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
1506			 been verified; qed",
1507		);
1508
1509		// make sure that slot number is strictly increasing
1510		if slot <= parent_slot {
1511			return Err(ConsensusError::ClientImport(
1512				babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
1513			));
1514		}
1515
1516		// if there's a pending epoch we'll save the previous epoch changes here
1517		// this way we can revert it if there's any error
1518		let mut old_epoch_changes = None;
1519
1520		// Use an extra scope to make the compiler happy, because otherwise it complains about the
1521		// mutex, even if we dropped it...
1522		let mut epoch_changes = {
1523			let mut epoch_changes = self.epoch_changes.shared_data_locked();
1524
1525			// check if there's any epoch change expected to happen at this slot.
1526			// `epoch` is the epoch to verify the block under, and `first_in_epoch` is true
1527			// if this is the first block in its chain for that epoch.
1528			//
1529			// also provides the total weight of the chain, including the imported block.
1530			let (epoch_descriptor, first_in_epoch, parent_weight) = {
1531				let parent_weight = if *parent_header.number() == Zero::zero() {
1532					0
1533				} else {
1534					aux_schema::load_block_weight(&*self.client, parent_hash)
1535						.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1536						.ok_or_else(|| {
1537							ConsensusError::ClientImport(
1538								babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
1539									.into(),
1540							)
1541						})?
1542				};
1543
1544				let intermediate =
1545					block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
1546
1547				let epoch_descriptor = intermediate.epoch_descriptor;
1548				let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
1549				(epoch_descriptor, first_in_epoch, parent_weight)
1550			};
1551
1552			let total_weight = parent_weight + pre_digest.added_weight();
1553
1554			// search for this all the time so we can reject unexpected announcements.
1555			let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
1556				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1557			let next_config_digest = find_next_config_digest::<Block>(&block.header)
1558				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1559
1560			match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
1561				(true, true, _) => {},
1562				(false, false, false) => {},
1563				(false, false, true) => {
1564					return Err(ConsensusError::ClientImport(
1565						babe_err(Error::<Block>::UnexpectedConfigChange).into(),
1566					))
1567				},
1568				(true, false, _) => {
1569					return Err(ConsensusError::ClientImport(
1570						babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
1571					))
1572				},
1573				(false, true, _) => {
1574					return Err(ConsensusError::ClientImport(
1575						babe_err(Error::<Block>::UnexpectedEpochChange).into(),
1576					))
1577				},
1578			}
1579
1580			if let Some(next_epoch_descriptor) = next_epoch_digest {
1581				old_epoch_changes = Some((*epoch_changes).clone());
1582
1583				let mut viable_epoch = epoch_changes
1584					.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
1585					.ok_or_else(|| {
1586						ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
1587					})?
1588					.into_cloned();
1589
1590				let epoch_config = next_config_digest
1591					.map(Into::into)
1592					.unwrap_or_else(|| viable_epoch.as_ref().config.clone());
1593
1594				// restrict info logging during initial sync to avoid spam
1595				let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
1596					log::Level::Debug
1597				} else {
1598					log::Level::Info
1599				};
1600
1601				if viable_epoch.as_ref().end_slot() <= slot {
1602					// Some epochs must have been skipped as our current slot fits outside the
1603					// current epoch. We will figure out which epoch it belongs to and we will
1604					// re-use the same data for that epoch.
1605					// Notice that we are only updating a local copy of the `Epoch`, this
1606					// makes it so that when we insert the next epoch into `EpochChanges` below
1607					// (after incrementing it), it will use the correct epoch index and start slot.
1608					// We do not update the original epoch that will be re-used because there might
1609					// be other forks (that we haven't imported) where the epoch isn't skipped, and
1610					// to import those forks we want to keep the original epoch data. Not updating
1611					// the original epoch works because when we search the tree for which epoch to
1612					// use for a given slot, we will search in-depth with the predicate
1613					// `epoch.start_slot <= slot` which will still match correctly without updating
1614					// `start_slot` to the correct value as below.
1615					let epoch = viable_epoch.as_mut();
1616					let prev_index = epoch.epoch_index;
1617					*epoch = epoch.clone_for_slot(slot);
1618
1619					warn!(
1620						target: LOG_TARGET,
1621						"👶 Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
1622					);
1623				}
1624
1625				log!(
1626					target: LOG_TARGET,
1627					log_level,
1628					"👶 New epoch {} launching at block {} (block slot {} >= start slot {}).",
1629					viable_epoch.as_ref().epoch_index,
1630					hash,
1631					slot,
1632					viable_epoch.as_ref().start_slot,
1633				);
1634
1635				let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
1636
1637				log!(
1638					target: LOG_TARGET,
1639					log_level,
1640					"👶 Next epoch starts at slot {}",
1641					next_epoch.as_ref().start_slot,
1642				);
1643
1644				// prune the tree of epochs not part of the finalized chain or
1645				// that are not live anymore, and then track the given epoch change
1646				// in the tree.
1647				// NOTE: it is important that these operations are done in this
1648				// order, otherwise if pruning after import the `is_descendent_of`
1649				// used by pruning may not know about the block that is being
1650				// imported.
1651				let prune_and_import = || {
1652					prune_finalized(self.client.clone(), &mut epoch_changes)?;
1653
1654					epoch_changes
1655						.import(
1656							descendent_query(&*self.client),
1657							hash,
1658							number,
1659							*block.header.parent_hash(),
1660							next_epoch,
1661						)
1662						.map_err(|e| {
1663							ConsensusError::ClientImport(format!(
1664								"Error importing epoch changes: {}",
1665								e
1666							))
1667						})?;
1668					Ok(())
1669				};
1670
1671				if let Err(e) = prune_and_import() {
1672					debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
1673					*epoch_changes =
1674						old_epoch_changes.expect("set `Some` above and not taken; qed");
1675					return Err(e);
1676				}
1677
1678				crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1679					block
1680						.auxiliary
1681						.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1682				});
1683			}
1684
1685			aux_schema::write_block_weight(hash, total_weight, |values| {
1686				block
1687					.auxiliary
1688					.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1689			});
1690
1691			// The fork choice rule is that we pick the heaviest chain (i.e.
1692			// more primary blocks), if there's a tie we go with the longest
1693			// chain.
1694			block.fork_choice = {
1695				let (last_best, last_best_number) = (info.best_hash, info.best_number);
1696
1697				let last_best_weight = if &last_best == block.header.parent_hash() {
1698					// the parent=genesis case is already covered for loading parent weight,
1699					// so we don't need to cover again here.
1700					parent_weight
1701				} else {
1702					aux_schema::load_block_weight(&*self.client, last_best)
1703						.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1704						.ok_or_else(|| {
1705							ConsensusError::ChainLookup(
1706								"No block weight for parent header.".to_string(),
1707							)
1708						})?
1709				};
1710
1711				Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
1712					true
1713				} else if total_weight == last_best_weight {
1714					number > last_best_number
1715				} else {
1716					false
1717				}))
1718			};
1719
1720			// Release the mutex, but it stays locked
1721			epoch_changes.release_mutex()
1722		};
1723
1724		let import_result = self.inner.import_block(block).await;
1725
1726		// revert to the original epoch changes in case there's an error
1727		// importing the block
1728		if import_result.is_err() {
1729			if let Some(old_epoch_changes) = old_epoch_changes {
1730				*epoch_changes.upgrade() = old_epoch_changes;
1731			}
1732		}
1733
1734		import_result.map_err(Into::into)
1735	}
1736
1737	async fn check_block(
1738		&self,
1739		block: BlockCheckParams<Block>,
1740	) -> Result<ImportResult, Self::Error> {
1741		self.inner.check_block(block).await.map_err(Into::into)
1742	}
1743}
1744
1745/// Gets the best finalized block and its slot, and prunes the given epoch tree.
1746fn prune_finalized<Block, Client>(
1747	client: Arc<Client>,
1748	epoch_changes: &mut EpochChangesFor<Block, Epoch>,
1749) -> Result<(), ConsensusError>
1750where
1751	Block: BlockT,
1752	Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = pezsp_blockchain::Error>,
1753{
1754	let info = client.info();
1755
1756	let finalized_slot = {
1757		let finalized_header = client
1758			.header(info.finalized_hash)
1759			.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1760			.expect(
1761				"best finalized hash was given by client; finalized headers must exist in db; qed",
1762			);
1763
1764		find_pre_digest::<Block>(&finalized_header)
1765			.expect("finalized header must be valid; valid blocks have a pre-digest; qed")
1766			.slot()
1767	};
1768
1769	epoch_changes
1770		.prune_finalized(
1771			descendent_query(&*client),
1772			&info.finalized_hash,
1773			info.finalized_number,
1774			finalized_slot,
1775		)
1776		.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1777
1778	Ok(())
1779}
1780
1781/// Produce a BABE block-import object to be used later on in the construction of
1782/// an import-queue.
1783///
1784/// Also returns a link object used to correctly instantiate the import queue
1785/// and background worker.
1786pub fn block_import<Client, Block: BlockT, I, CIDP, SC>(
1787	config: BabeConfiguration,
1788	wrapped_block_import: I,
1789	client: Arc<Client>,
1790	create_inherent_data_providers: CIDP,
1791	select_chain: SC,
1792	offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1793) -> ClientResult<(BabeBlockImport<Block, Client, I, CIDP, SC>, BabeLink<Block>)>
1794where
1795	Client: AuxStore
1796		+ HeaderBackend<Block>
1797		+ HeaderMetadata<Block, Error = pezsp_blockchain::Error>
1798		+ PreCommitActions<Block>
1799		+ 'static,
1800{
1801	let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
1802	let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
1803
1804	// NOTE: this isn't entirely necessary, but since we didn't use to prune the
1805	// epoch tree it is useful as a migration, so that nodes prune long trees on
1806	// startup rather than waiting until importing the next epoch change block.
1807	prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
1808
1809	let client_weak = Arc::downgrade(&client);
1810	let on_finality = move |summary: &FinalityNotification<Block>| {
1811		if let Some(client) = client_weak.upgrade() {
1812			aux_storage_cleanup(client.as_ref(), summary)
1813		} else {
1814			Default::default()
1815		}
1816	};
1817	client.register_finality_action(Box::new(on_finality));
1818
1819	let import = BabeBlockImport::new(
1820		client,
1821		epoch_changes,
1822		wrapped_block_import,
1823		config,
1824		create_inherent_data_providers,
1825		select_chain,
1826		offchain_tx_pool_factory,
1827	);
1828
1829	Ok((import, link))
1830}
1831
1832/// Parameters passed to [`import_queue`].
1833pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, Spawn> {
1834	/// The BABE link that is created by [`block_import`].
1835	pub link: BabeLink<Block>,
1836	/// The block import that should be wrapped.
1837	pub block_import: BI,
1838	/// Optional justification import.
1839	pub justification_import: Option<BoxJustificationImport<Block>>,
1840	/// The client to interact with the internals of the node.
1841	pub client: Arc<Client>,
1842	/// Slot duration.
1843	pub slot_duration: SlotDuration,
1844	/// Spawner for spawning futures.
1845	pub spawner: &'a Spawn,
1846	/// Registry for prometheus metrics.
1847	pub registry: Option<&'a Registry>,
1848	/// Optional telemetry handle to report telemetry events.
1849	pub telemetry: Option<TelemetryHandle>,
1850}
1851
1852/// Start an import queue for the BABE consensus algorithm.
1853///
1854/// This method returns the import queue, some data that needs to be passed to the block authoring
1855/// logic (`BabeLink`), and a future that must be run to
1856/// completion and is responsible for listening to finality notifications and
1857/// pruning the epoch changes tree.
1858///
1859/// The block import object provided must be the `BabeBlockImport` or a wrapper
1860/// of it, otherwise crucial import logic will be omitted.
1861pub fn import_queue<Block: BlockT, Client, BI, Spawn>(
1862	ImportQueueParams {
1863		link: babe_link,
1864		block_import,
1865		justification_import,
1866		client,
1867		slot_duration,
1868		spawner,
1869		registry,
1870		telemetry,
1871	}: ImportQueueParams<'_, Block, BI, Client, Spawn>,
1872) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
1873where
1874	BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1875	Client: ProvideRuntimeApi<Block>
1876		+ HeaderBackend<Block>
1877		+ HeaderMetadata<Block, Error = pezsp_blockchain::Error>
1878		+ AuxStore
1879		+ Send
1880		+ Sync
1881		+ 'static,
1882	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1883	Spawn: SpawnEssentialNamed,
1884{
1885	const HANDLE_BUFFER_SIZE: usize = 1024;
1886
1887	let verifier = BabeVerifier {
1888		slot_duration,
1889		config: babe_link.config.clone(),
1890		epoch_changes: babe_link.epoch_changes.clone(),
1891		telemetry,
1892		client: client.clone(),
1893	};
1894
1895	let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
1896
1897	let answer_requests =
1898		answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
1899
1900	spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
1901
1902	Ok((
1903		BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
1904		BabeWorkerHandle(worker_tx),
1905	))
1906}
1907
1908/// Reverts protocol aux data to at most the last finalized block.
1909/// In particular, epoch-changes and block weights announced after the revert
1910/// point are removed.
1911pub fn revert<Block, Client, Backend>(
1912	client: Arc<Client>,
1913	backend: Arc<Backend>,
1914	blocks: NumberFor<Block>,
1915) -> ClientResult<()>
1916where
1917	Block: BlockT,
1918	Client: AuxStore
1919		+ HeaderMetadata<Block, Error = pezsp_blockchain::Error>
1920		+ HeaderBackend<Block>
1921		+ ProvideRuntimeApi<Block>
1922		+ UsageProvider<Block>,
1923	Client::Api: BabeApi<Block>,
1924	Backend: BackendT<Block>,
1925{
1926	let best_number = client.info().best_number;
1927	let finalized = client.info().finalized_number;
1928
1929	let revertible = blocks.min(best_number - finalized);
1930	if revertible == Zero::zero() {
1931		return Ok(());
1932	}
1933
1934	let revert_up_to_number = best_number - revertible;
1935	let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
1936		format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
1937	))?;
1938
1939	// Revert epoch changes tree.
1940
1941	// This config is only used on-genesis.
1942	let config = configuration(&*client)?;
1943	let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
1944	let mut epoch_changes = epoch_changes.shared_data();
1945
1946	if revert_up_to_number == Zero::zero() {
1947		// Special case, no epoch changes data were present on genesis.
1948		*epoch_changes = EpochChangesFor::<Block, Epoch>::default();
1949	} else {
1950		epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
1951	}
1952
1953	// Remove block weights added after the revert point.
1954
1955	let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
1956
1957	let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
1958		pezsp_blockchain::tree_route(&*client, revert_up_to_hash, leaf)
1959			.map(|route| route.retracted().is_empty())
1960			.unwrap_or_default()
1961	});
1962
1963	for leaf in leaves {
1964		let mut hash = leaf;
1965		loop {
1966			let meta = client.header_metadata(hash)?;
1967			if meta.number <= revert_up_to_number
1968				|| !weight_keys.insert(aux_schema::block_weight_key(hash))
1969			{
1970				// We've reached the revert point or an already processed branch, stop here.
1971				break;
1972			}
1973			hash = meta.parent;
1974		}
1975	}
1976
1977	let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
1978
1979	// Write epoch changes and remove weights in one shot.
1980	aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
1981		client.insert_aux(values, weight_keys.iter())
1982	})
1983}
1984
1985fn query_epoch_changes<Block, Client>(
1986	epoch_changes: &SharedEpochChanges<Block, Epoch>,
1987	client: &Client,
1988	config: &BabeConfiguration,
1989	block_number: NumberFor<Block>,
1990	slot: Slot,
1991	parent_hash: Block::Hash,
1992) -> Result<
1993	(ViableEpochDescriptor<Block::Hash, NumberFor<Block>, Epoch>, ViableEpoch<Epoch>),
1994	Error<Block>,
1995>
1996where
1997	Block: BlockT,
1998	Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = pezsp_blockchain::Error>,
1999{
2000	let epoch_changes = epoch_changes.shared_data();
2001	let epoch_descriptor = epoch_changes
2002		.epoch_descriptor_for_child_of(
2003			descendent_query(client),
2004			&parent_hash,
2005			block_number - 1u32.into(),
2006			slot,
2007		)
2008		.map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
2009		.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2010	let viable_epoch = epoch_changes
2011		.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
2012		.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2013	Ok((epoch_descriptor, viable_epoch.into_cloned()))
2014}