Skip to main content

sc_consensus_babe/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! # BABE (Blind Assignment for Blockchain Extension)
20//!
21//! BABE is a slot-based block production mechanism which uses a VRF PRNG to
22//! randomly perform the slot allocation. On every slot, all the authorities
23//! generate a new random number with the VRF function and if it is lower than a
24//! given threshold (which is proportional to their weight/stake) they have a
25//! right to produce a block. The proof of the VRF function execution will be
26//! used by other peer to validate the legitimacy of the slot claim.
27//!
28//! The engine is also responsible for collecting entropy on-chain which will be
29//! used to seed the given VRF PRNG. An epoch is a contiguous number of slots
30//! under which we will be using the same authority set. During an epoch all VRF
31//! outputs produced as a result of block production will be collected on an
32//! on-chain randomness pool. Epoch changes are announced one epoch in advance,
33//! i.e. when ending epoch N, we announce the parameters (randomness,
34//! authorities, etc.) for epoch N+2.
35//!
36//! Since the slot assignment is randomized, it is possible that a slot is
37//! assigned to multiple validators in which case we will have a temporary fork,
38//! or that a slot is assigned to no validator in which case no block is
39//! produced. Which means that block times are not deterministic.
40//!
41//! The protocol has a parameter `c` [0, 1] for which `1 - c` is the probability
42//! of a slot being empty. The choice of this parameter affects the security of
43//! the protocol relating to maximum tolerable network delays.
44//!
45//! In addition to the VRF-based slot assignment described above, which we will
46//! call primary slots, the engine also supports a deterministic secondary slot
47//! assignment. Primary slots take precedence over secondary slots, when
48//! authoring the node starts by trying to claim a primary slot and falls back
49//! to a secondary slot claim attempt. The secondary slot assignment is done
50//! by picking the authority at index:
51//!
52//! `blake2_256(epoch_randomness ++ slot_number) % authorities_len`.
53//!
54//! The secondary slots supports either a `SecondaryPlain` or `SecondaryVRF`
55//! variant. Comparing with `SecondaryPlain` variant, the `SecondaryVRF` variant
56//! generates an additional VRF output. The output is not included in beacon
57//! randomness, but can be consumed by parachains.
58//!
59//! The fork choice rule is weight-based, where weight equals the number of
60//! primary blocks in the chain. We will pick the heaviest chain (more primary
61//! blocks) and will go with the longest one in case of a tie.
62//!
63//! An in-depth description and analysis of the protocol can be found here:
64//! <https://research.web3.foundation/Polkadot/protocols/block-production/Babe>
65
66#![forbid(unsafe_code)]
67#![warn(missing_docs)]
68
69use std::{
70	collections::HashSet,
71	future::Future,
72	ops::{Deref, DerefMut},
73	pin::Pin,
74	sync::Arc,
75	task::{Context, Poll},
76	time::Duration,
77};
78
79use codec::{Decode, Encode};
80use futures::{
81	channel::{
82		mpsc::{channel, Receiver, Sender},
83		oneshot,
84	},
85	prelude::*,
86};
87use log::{debug, info, log, trace, warn};
88use parking_lot::Mutex;
89use prometheus_endpoint::Registry;
90
91use sc_client_api::{
92	backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
93	PreCommitActions, UsageProvider,
94};
95use sc_consensus::{
96	block_import::{
97		BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
98		StateAction,
99	},
100	import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
101};
102use sc_consensus_epochs::{
103	descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpoch,
104	ViableEpochDescriptor,
105};
106use sc_consensus_slots::{
107	check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
108	SlotInfo, StorageChanges,
109};
110use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
111use sc_transaction_pool_api::OffchainTransactionPoolFactory;
112use sp_api::{ApiExt, ProvideRuntimeApi};
113use sp_application_crypto::AppCrypto;
114use sp_block_builder::BlockBuilder as BlockBuilderApi;
115use sp_blockchain::{
116	Backend as _, BlockStatus, Error as ClientError, HeaderBackend, HeaderMetadata,
117	Result as ClientResult,
118};
119use sp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain};
120use sp_consensus_babe::{inherents::BabeInherentData, SlotDuration};
121use sp_consensus_slots::Slot;
122use sp_core::traits::SpawnEssentialNamed;
123use sp_inherents::{CreateInherentDataProviders, InherentDataProvider};
124use sp_keystore::KeystorePtr;
125use sp_runtime::{
126	generic::OpaqueDigestItemId,
127	traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
128	DigestItem,
129};
130
131pub use sc_consensus_slots::SlotProportion;
132pub use sp_consensus::SyncOracle;
133pub use sp_consensus_babe::{
134	digests::{
135		CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
136		PrimaryPreDigest, SecondaryPlainPreDigest,
137	},
138	AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
139	BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
140};
141
142pub use aux_schema::load_block_weight as block_weight;
143use sp_timestamp::Timestamp;
144
145mod migration;
146mod verification;
147
148pub mod authorship;
149pub mod aux_schema;
150#[cfg(test)]
151mod tests;
152
153const LOG_TARGET: &str = "babe";
154
155/// VRF context used for slots claiming lottery.
156const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"substrate-babe-vrf";
157
158/// VRF output length for slots claiming lottery.
159const AUTHORING_SCORE_LENGTH: usize = 16;
160
161/// BABE epoch information
162#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
163pub struct Epoch(sp_consensus_babe::Epoch);
164
165impl Deref for Epoch {
166	type Target = sp_consensus_babe::Epoch;
167
168	fn deref(&self) -> &Self::Target {
169		&self.0
170	}
171}
172
173impl DerefMut for Epoch {
174	fn deref_mut(&mut self) -> &mut Self::Target {
175		&mut self.0
176	}
177}
178
179impl From<sp_consensus_babe::Epoch> for Epoch {
180	fn from(epoch: sp_consensus_babe::Epoch) -> Self {
181		Epoch(epoch)
182	}
183}
184
185impl EpochT for Epoch {
186	type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
187	type Slot = Slot;
188
189	fn increment(
190		&self,
191		(descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
192	) -> Epoch {
193		sp_consensus_babe::Epoch {
194			epoch_index: self.epoch_index + 1,
195			start_slot: self.start_slot + self.duration,
196			duration: self.duration,
197			authorities: descriptor.authorities,
198			randomness: descriptor.randomness,
199			config,
200		}
201		.into()
202	}
203
204	fn start_slot(&self) -> Slot {
205		self.start_slot
206	}
207
208	fn end_slot(&self) -> Slot {
209		self.start_slot + self.duration
210	}
211}
212
213impl Epoch {
214	/// Create the genesis epoch (epoch #0).
215	///
216	/// This is defined to start at the slot of the first block, so that has to be provided.
217	pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
218		sp_consensus_babe::Epoch {
219			epoch_index: 0,
220			start_slot: slot,
221			duration: genesis_config.epoch_length,
222			authorities: genesis_config.authorities.clone(),
223			randomness: genesis_config.randomness,
224			config: BabeEpochConfiguration {
225				c: genesis_config.c,
226				allowed_slots: genesis_config.allowed_slots,
227			},
228		}
229		.into()
230	}
231
232	/// Clone and tweak epoch information to refer to the specified slot.
233	///
234	/// All the information which depends on the slot value is recomputed and assigned
235	/// to the returned epoch instance.
236	///
237	/// The `slot` must be greater than or equal the original epoch start slot,
238	/// if is less this operation is equivalent to a simple clone.
239	pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
240		let mut epoch = self.clone();
241
242		let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
243
244		let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
245			"epoch number is u64; it should be strictly smaller than number of slots; \
246				slots relate in some way to wall clock time; \
247				if u64 is not enough we should crash for safety; qed.",
248		);
249
250		let start_slot = skipped_epochs
251			.checked_mul(epoch.duration)
252			.and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
253			.expect(
254				"slot number is u64; it should relate in some way to wall clock time; \
255				 if u64 is not enough we should crash for safety; qed.",
256			);
257
258		epoch.epoch_index = epoch_index;
259		epoch.start_slot = Slot::from(start_slot);
260
261		epoch
262	}
263}
264
265/// Errors encountered by the babe authorship task.
266#[derive(Debug, thiserror::Error)]
267pub enum Error<B: BlockT> {
268	/// Multiple BABE pre-runtime digests
269	#[error("Multiple BABE pre-runtime digests, rejecting!")]
270	MultiplePreRuntimeDigests,
271	/// No BABE pre-runtime digest found
272	#[error("No BABE pre-runtime digest found")]
273	NoPreRuntimeDigest,
274	/// Multiple BABE epoch change digests
275	#[error("Multiple BABE epoch change digests, rejecting!")]
276	MultipleEpochChangeDigests,
277	/// Multiple BABE config change digests
278	#[error("Multiple BABE config change digests, rejecting!")]
279	MultipleConfigChangeDigests,
280	/// Could not extract timestamp and slot
281	#[error("Could not extract timestamp and slot: {0}")]
282	Extraction(ConsensusError),
283	/// Could not fetch epoch
284	#[error("Could not fetch epoch at {0:?}")]
285	FetchEpoch(B::Hash),
286	/// Header rejected: too far in the future
287	#[error("Header {0:?} rejected: too far in the future")]
288	TooFarInFuture(B::Hash),
289	/// Parent unavailable. Cannot import
290	#[error("Parent ({0}) of {1} unavailable. Cannot import")]
291	ParentUnavailable(B::Hash, B::Hash),
292	/// Slot number must increase
293	#[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
294	SlotMustIncrease(Slot, Slot),
295	/// Header has a bad seal
296	#[error("Header {0:?} has a bad seal")]
297	HeaderBadSeal(B::Hash),
298	/// Header is unsealed
299	#[error("Header {0:?} is unsealed")]
300	HeaderUnsealed(B::Hash),
301	/// Slot author not found
302	#[error("Slot author not found")]
303	SlotAuthorNotFound,
304	/// Secondary slot assignments are disabled for the current epoch.
305	#[error("Secondary slot assignments are disabled for the current epoch.")]
306	SecondarySlotAssignmentsDisabled,
307	/// Bad signature
308	#[error("Bad signature on {0:?}")]
309	BadSignature(B::Hash),
310	/// Invalid author: Expected secondary author
311	#[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
312	InvalidAuthor(AuthorityId, AuthorityId),
313	/// No secondary author expected.
314	#[error("No secondary author expected.")]
315	NoSecondaryAuthorExpected,
316	/// VRF verification failed
317	#[error("VRF verification failed")]
318	VrfVerificationFailed,
319	/// Primary slot threshold too low
320	#[error("VRF output rejected, threshold {0} exceeded")]
321	VrfThresholdExceeded(u128),
322	/// Could not fetch parent header
323	#[error("Could not fetch parent header: {0}")]
324	FetchParentHeader(sp_blockchain::Error),
325	/// Expected epoch change to happen.
326	#[error("Expected epoch change to happen at {0:?}, s{1}")]
327	ExpectedEpochChange(B::Hash, Slot),
328	/// Unexpected config change.
329	#[error("Unexpected config change")]
330	UnexpectedConfigChange,
331	/// Unexpected epoch change
332	#[error("Unexpected epoch change")]
333	UnexpectedEpochChange,
334	/// Parent block has no associated weight
335	#[error("Parent block of {0} has no associated weight")]
336	ParentBlockNoAssociatedWeight(B::Hash),
337	/// Check inherents error
338	#[error("Checking inherents failed: {0}")]
339	CheckInherents(sp_inherents::Error),
340	/// Unhandled check inherents error
341	#[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
342	CheckInherentsUnhandled(sp_inherents::InherentIdentifier),
343	/// Create inherents error.
344	#[error("Creating inherents failed: {0}")]
345	CreateInherents(sp_inherents::Error),
346	/// Background worker is not running and therefore requests cannot be answered.
347	#[error("Background worker is not running")]
348	BackgroundWorkerTerminated,
349	/// Client error
350	#[error(transparent)]
351	Client(sp_blockchain::Error),
352	/// Runtime Api error.
353	#[error(transparent)]
354	RuntimeApi(sp_api::ApiError),
355	/// Fork tree error
356	#[error(transparent)]
357	ForkTree(Box<fork_tree::Error<sp_blockchain::Error>>),
358}
359
360impl<B: BlockT> From<Error<B>> for String {
361	fn from(error: Error<B>) -> String {
362		error.to_string()
363	}
364}
365
366fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
367	debug!(target: LOG_TARGET, "{}", error);
368	error
369}
370
371/// Intermediate value passed to block importer.
372pub struct BabeIntermediate<B: BlockT> {
373	/// The epoch descriptor.
374	pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
375}
376
377/// Intermediate key for Babe engine.
378pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
379
380/// Read configuration from the runtime state at current best block.
381pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
382where
383	C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
384	C::Api: BabeApi<B>,
385{
386	let at_hash = if client.usage_info().chain.finalized_state.is_some() {
387		client.usage_info().chain.best_hash
388	} else {
389		debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
390		client.usage_info().chain.genesis_hash
391	};
392
393	let runtime_api = client.runtime_api();
394	let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
395
396	let config = match version {
397		Some(1) => {
398			#[allow(deprecated)]
399			{
400				runtime_api.configuration_before_version_2(at_hash)?.into()
401			}
402		},
403		Some(2) => runtime_api.configuration(at_hash)?,
404		_ => {
405			return Err(sp_blockchain::Error::VersionInvalid(
406				"Unsupported or invalid BabeApi version".to_string(),
407			))
408		},
409	};
410	Ok(config)
411}
412
413/// Parameters for BABE.
414pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
415	/// The keystore that manages the keys of the node.
416	pub keystore: KeystorePtr,
417
418	/// The client to use
419	pub client: Arc<C>,
420
421	/// The SelectChain Strategy
422	pub select_chain: SC,
423
424	/// The environment we are producing blocks for.
425	pub env: E,
426
427	/// The underlying block-import object to supply our produced blocks to.
428	/// This must be a `BabeBlockImport` or a wrapper of it, otherwise
429	/// critical consensus logic will be omitted.
430	pub block_import: I,
431
432	/// A sync oracle
433	pub sync_oracle: SO,
434
435	/// Hook into the sync module to control the justification sync process.
436	pub justification_sync_link: L,
437
438	/// Something that can create the inherent data providers.
439	pub create_inherent_data_providers: CIDP,
440
441	/// Force authoring of blocks even if we are offline
442	pub force_authoring: bool,
443
444	/// Strategy and parameters for backing off block production.
445	pub backoff_authoring_blocks: Option<BS>,
446
447	/// The source of timestamps for relative slots
448	pub babe_link: BabeLink<B>,
449
450	/// The proportion of the slot dedicated to proposing.
451	///
452	/// The block proposing will be limited to this proportion of the slot from the starting of the
453	/// slot. However, the proposing can still take longer when there is some lenience factor
454	/// applied, because there were no blocks produced for some slots.
455	pub block_proposal_slot_portion: SlotProportion,
456
457	/// The maximum proportion of the slot dedicated to proposing with any lenience factor applied
458	/// due to no blocks being produced.
459	pub max_block_proposal_slot_portion: Option<SlotProportion>,
460
461	/// Handle use to report telemetries.
462	pub telemetry: Option<TelemetryHandle>,
463}
464
465/// Start the babe worker.
466pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
467	BabeParams {
468		keystore,
469		client,
470		select_chain,
471		env,
472		block_import,
473		sync_oracle,
474		justification_sync_link,
475		create_inherent_data_providers,
476		force_authoring,
477		backoff_authoring_blocks,
478		babe_link,
479		block_proposal_slot_portion,
480		max_block_proposal_slot_portion,
481		telemetry,
482	}: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
483) -> Result<BabeWorker<B>, ConsensusError>
484where
485	B: BlockT,
486	C: ProvideRuntimeApi<B>
487		+ HeaderBackend<B>
488		+ HeaderMetadata<B, Error = ClientError>
489		+ Send
490		+ Sync
491		+ 'static,
492	C::Api: BabeApi<B>,
493	SC: SelectChain<B> + 'static,
494	E: Environment<B, Error = Error> + Send + Sync + 'static,
495	E::Proposer: Proposer<B, Error = Error>,
496	I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
497	SO: SyncOracle + Send + Sync + Clone + 'static,
498	L: sc_consensus::JustificationSyncLink<B> + 'static,
499	CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
500	CIDP::InherentDataProviders: InherentDataProviderExt + Send,
501	BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
502	Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
503{
504	let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
505
506	let worker = BabeSlotWorker {
507		client: client.clone(),
508		block_import,
509		env,
510		sync_oracle: sync_oracle.clone(),
511		justification_sync_link,
512		force_authoring,
513		backoff_authoring_blocks,
514		keystore,
515		epoch_changes: babe_link.epoch_changes.clone(),
516		slot_notification_sinks: slot_notification_sinks.clone(),
517		config: babe_link.config.clone(),
518		block_proposal_slot_portion,
519		max_block_proposal_slot_portion,
520		telemetry,
521	};
522
523	info!(target: LOG_TARGET, "👶 Starting BABE Authorship worker");
524
525	let slot_worker = sc_consensus_slots::start_slot_worker(
526		babe_link.config.slot_duration(),
527		select_chain,
528		sc_consensus_slots::SimpleSlotWorkerToSlotWorker(worker),
529		sync_oracle,
530		create_inherent_data_providers,
531	);
532
533	Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
534}
535
536// Remove obsolete block's weight data by leveraging finality notifications.
537// This includes data for all finalized blocks (excluding the most recent one)
538// and all stale branches.
539fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
540	client: &C,
541	notification: &FinalityNotification<Block>,
542) -> AuxDataOperations {
543	let mut hashes = HashSet::new();
544
545	let first = notification.tree_route.first().unwrap_or(&notification.hash);
546	match client.header_metadata(*first) {
547		Ok(meta) => {
548			hashes.insert(meta.parent);
549		},
550		Err(err) => {
551			warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
552		},
553	}
554
555	// Cleans data for finalized block's ancestors
556	hashes.extend(
557		notification
558			.tree_route
559			.iter()
560			// Ensure we don't prune latest finalized block.
561			// This should not happen, but better be safe than sorry!
562			.filter(|h| **h != notification.hash),
563	);
564
565	hashes.extend(notification.stale_blocks.iter().map(|b| b.hash));
566
567	hashes
568		.into_iter()
569		.map(|val| (aux_schema::block_weight_key(val), None))
570		.collect()
571}
572
573async fn answer_requests<B: BlockT, C>(
574	mut request_rx: Receiver<BabeRequest<B>>,
575	config: BabeConfiguration,
576	client: Arc<C>,
577	epoch_changes: SharedEpochChanges<B, Epoch>,
578) where
579	C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
580{
581	while let Some(request) = request_rx.next().await {
582		match request {
583			BabeRequest::EpochData(response) => {
584				let _ = response.send(epoch_changes.shared_data().clone());
585			},
586			BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
587				let lookup = || {
588					let epoch_changes = epoch_changes.shared_data();
589					epoch_changes
590						.epoch_data_for_child_of(
591							descendent_query(&*client),
592							&parent_hash,
593							parent_number,
594							slot,
595							|slot| Epoch::genesis(&config, slot),
596						)
597						.map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
598						.ok_or(Error::<B>::FetchEpoch(parent_hash))
599				};
600
601				let _ = response.send(lookup());
602			},
603		}
604	}
605}
606
607/// Requests to the BABE service.
608enum BabeRequest<B: BlockT> {
609	/// Request all available epoch data.
610	EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
611	/// Request the epoch that a child of the given block, with the given slot number would have.
612	///
613	/// The parent block is identified by its hash and number.
614	EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
615}
616
617/// A handle to the BABE worker for issuing requests.
618#[derive(Clone)]
619pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
620
621impl<B: BlockT> BabeWorkerHandle<B> {
622	async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
623		match self.0.clone().send(request).await {
624			Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
625			Err(err) => warn!(
626				target: LOG_TARGET,
627				"Unhandled error when sending request to worker: {:?}", err
628			),
629			_ => {},
630		}
631
632		Ok(())
633	}
634
635	/// Fetch all available epoch data.
636	pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
637		let (tx, rx) = oneshot::channel();
638		self.send_request(BabeRequest::EpochData(tx)).await?;
639
640		rx.await.or(Err(Error::BackgroundWorkerTerminated))
641	}
642
643	/// Fetch the epoch that a child of the given block, with the given slot number would have.
644	///
645	/// The parent block is identified by its hash and number.
646	pub async fn epoch_data_for_child_of(
647		&self,
648		parent_hash: B::Hash,
649		parent_number: NumberFor<B>,
650		slot: Slot,
651	) -> Result<Epoch, Error<B>> {
652		let (tx, rx) = oneshot::channel();
653		self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
654			.await?;
655
656		rx.await.or(Err(Error::BackgroundWorkerTerminated))?
657	}
658}
659
660/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
661#[must_use]
662pub struct BabeWorker<B: BlockT> {
663	inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
664	slot_notification_sinks: SlotNotificationSinks<B>,
665}
666
667impl<B: BlockT> BabeWorker<B> {
668	/// Return an event stream of notifications for when new slot happens, and the corresponding
669	/// epoch descriptor.
670	pub fn slot_notification_stream(
671		&self,
672	) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
673		const CHANNEL_BUFFER_SIZE: usize = 1024;
674
675		let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
676		self.slot_notification_sinks.lock().push(sink);
677		stream
678	}
679}
680
681impl<B: BlockT> Future for BabeWorker<B> {
682	type Output = ();
683
684	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
685		self.inner.as_mut().poll(cx)
686	}
687}
688
689/// Slot notification sinks.
690type SlotNotificationSinks<B> = Arc<
691	Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
692>;
693
694struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
695	client: Arc<C>,
696	block_import: I,
697	env: E,
698	sync_oracle: SO,
699	justification_sync_link: L,
700	force_authoring: bool,
701	backoff_authoring_blocks: Option<BS>,
702	keystore: KeystorePtr,
703	epoch_changes: SharedEpochChanges<B, Epoch>,
704	slot_notification_sinks: SlotNotificationSinks<B>,
705	config: BabeConfiguration,
706	block_proposal_slot_portion: SlotProportion,
707	max_block_proposal_slot_portion: Option<SlotProportion>,
708	telemetry: Option<TelemetryHandle>,
709}
710
711#[async_trait::async_trait]
712impl<B, C, E, I, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
713	for BabeSlotWorker<B, C, E, I, SO, L, BS>
714where
715	B: BlockT,
716	C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
717	C::Api: BabeApi<B>,
718	E: Environment<B, Error = Error> + Send + Sync,
719	E::Proposer: Proposer<B, Error = Error>,
720	I: BlockImport<B> + Send + Sync + 'static,
721	SO: SyncOracle + Send + Clone + Sync,
722	L: sc_consensus::JustificationSyncLink<B>,
723	BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
724	Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
725{
726	type Claim = (PreDigest, AuthorityId);
727	type SyncOracle = SO;
728	type JustificationSyncLink = L;
729	type CreateProposer =
730		Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
731	type Proposer = E::Proposer;
732	type BlockImport = I;
733	type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
734
735	fn logging_target(&self) -> &'static str {
736		LOG_TARGET
737	}
738
739	fn block_import(&mut self) -> &mut Self::BlockImport {
740		&mut self.block_import
741	}
742
743	fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
744		self.epoch_changes
745			.shared_data()
746			.epoch_descriptor_for_child_of(
747				descendent_query(&*self.client),
748				&parent.hash(),
749				*parent.number(),
750				slot,
751			)
752			.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
753			.ok_or(ConsensusError::InvalidAuthoritiesSet)
754	}
755
756	fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
757		self.epoch_changes
758			.shared_data()
759			.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
760			.map(|epoch| epoch.as_ref().authorities.len())
761	}
762
763	async fn claim_slot(
764		&mut self,
765		_parent_header: &B::Header,
766		slot: Slot,
767		epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
768	) -> Option<Self::Claim> {
769		debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
770		let s = authorship::claim_slot(
771			slot,
772			self.epoch_changes
773				.shared_data()
774				.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
775				.as_ref(),
776			&self.keystore,
777		);
778
779		if s.is_some() {
780			debug!(target: LOG_TARGET, "Claimed slot {}", slot);
781		}
782
783		s
784	}
785
786	fn notify_slot(
787		&self,
788		_parent_header: &B::Header,
789		slot: Slot,
790		epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
791	) {
792		let sinks = &mut self.slot_notification_sinks.lock();
793		sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
794			Ok(()) => true,
795			Err(e) => {
796				if e.is_full() {
797					warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
798					true
799				} else {
800					false
801				}
802			},
803		});
804	}
805
806	fn pre_digest_data(&self, _slot: Slot, claim: &Self::Claim) -> Vec<sp_runtime::DigestItem> {
807		vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
808	}
809
810	async fn block_import_params(
811		&self,
812		header: B::Header,
813		header_hash: &B::Hash,
814		body: Vec<B::Extrinsic>,
815		storage_changes: StorageChanges<B>,
816		(_, public): Self::Claim,
817		epoch_descriptor: Self::AuxData,
818	) -> Result<BlockImportParams<B>, ConsensusError> {
819		let signature = self
820			.keystore
821			.sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
822			.map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
823			.ok_or_else(|| {
824				ConsensusError::CannotSign(format!(
825					"Could not find key in keystore. Key: {:?}",
826					public
827				))
828			})?;
829
830		let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
831
832		let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
833		import_block.post_digests.push(digest_item);
834		import_block.body = Some(body);
835		import_block.state_action =
836			StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
837		import_block
838			.insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
839
840		Ok(import_block)
841	}
842
843	fn force_authoring(&self) -> bool {
844		self.force_authoring
845	}
846
847	fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
848		if let Some(ref strategy) = self.backoff_authoring_blocks {
849			if let Ok(chain_head_slot) =
850				find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
851			{
852				return strategy.should_backoff(
853					*chain_head.number(),
854					chain_head_slot,
855					self.client.info().finalized_number,
856					slot,
857					self.logging_target(),
858				);
859			}
860		}
861		false
862	}
863
864	fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
865		&mut self.sync_oracle
866	}
867
868	fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
869		&mut self.justification_sync_link
870	}
871
872	fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
873		Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
874	}
875
876	fn telemetry(&self) -> Option<TelemetryHandle> {
877		self.telemetry.clone()
878	}
879
880	fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
881		let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
882
883		sc_consensus_slots::proposing_remaining_duration(
884			parent_slot,
885			slot_info,
886			&self.block_proposal_slot_portion,
887			self.max_block_proposal_slot_portion.as_ref(),
888			sc_consensus_slots::SlotLenienceType::Exponential,
889			self.logging_target(),
890		)
891	}
892}
893
894/// Extract the BABE pre digest from the given header. Pre-runtime digests are
895/// mandatory, the function will return `Err` if none is found.
896pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
897	// genesis block doesn't contain a pre digest so let's generate a
898	// dummy one to not break any invariants in the rest of the code
899	if header.number().is_zero() {
900		return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
901			slot: 0.into(),
902			authority_index: 0,
903		}));
904	}
905
906	let mut pre_digest: Option<_> = None;
907	for log in header.digest().logs() {
908		trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
909		match (log.as_babe_pre_digest(), pre_digest.is_some()) {
910			(Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
911			(None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
912			(s, false) => pre_digest = s,
913		}
914	}
915	pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
916}
917
918/// Check whether the given header contains a BABE epoch change digest.
919pub fn contains_epoch_change<B: BlockT>(header: &B::Header) -> bool {
920	find_next_epoch_digest::<B>(header).ok().flatten().is_some()
921}
922
923/// Extract the BABE epoch change digest from the given header, if it exists.
924pub fn find_next_epoch_digest<B: BlockT>(
925	header: &B::Header,
926) -> Result<Option<NextEpochDescriptor>, Error<B>> {
927	let mut epoch_digest: Option<_> = None;
928	for log in header.digest().logs() {
929		trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
930		let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
931		match (log, epoch_digest.is_some()) {
932			(Some(ConsensusLog::NextEpochData(_)), true) => {
933				return Err(babe_err(Error::MultipleEpochChangeDigests))
934			},
935			(Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
936			_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
937		}
938	}
939
940	Ok(epoch_digest)
941}
942
943/// Extract the BABE config change digest from the given header, if it exists.
944fn find_next_config_digest<B: BlockT>(
945	header: &B::Header,
946) -> Result<Option<NextConfigDescriptor>, Error<B>> {
947	let mut config_digest: Option<_> = None;
948	for log in header.digest().logs() {
949		trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
950		let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
951		match (log, config_digest.is_some()) {
952			(Some(ConsensusLog::NextConfigData(_)), true) => {
953				return Err(babe_err(Error::MultipleConfigChangeDigests))
954			},
955			(Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
956			_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
957		}
958	}
959
960	Ok(config_digest)
961}
962
963/// State that must be shared between the import queue and the authoring logic.
964#[derive(Clone)]
965pub struct BabeLink<Block: BlockT> {
966	epoch_changes: SharedEpochChanges<Block, Epoch>,
967	config: BabeConfiguration,
968}
969
970impl<Block: BlockT> BabeLink<Block> {
971	/// Get the epoch changes of this link.
972	pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
973		&self.epoch_changes
974	}
975
976	/// Get the config of this link.
977	pub fn config(&self) -> &BabeConfiguration {
978		&self.config
979	}
980}
981
982/// A verifier for Babe blocks.
983pub struct BabeVerifier<Block: BlockT, Client> {
984	client: Arc<Client>,
985	slot_duration: SlotDuration,
986	config: BabeConfiguration,
987	epoch_changes: SharedEpochChanges<Block, Epoch>,
988	telemetry: Option<TelemetryHandle>,
989}
990
991#[async_trait::async_trait]
992impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client>
993where
994	Block: BlockT,
995	Client: HeaderMetadata<Block, Error = sp_blockchain::Error>
996		+ HeaderBackend<Block>
997		+ ProvideRuntimeApi<Block>
998		+ Send
999		+ Sync
1000		+ AuxStore,
1001	Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
1002{
1003	async fn verify(
1004		&self,
1005		mut block: BlockImportParams<Block>,
1006	) -> Result<BlockImportParams<Block>, String> {
1007		trace!(
1008			target: LOG_TARGET,
1009			"Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
1010			block.origin,
1011			block.header,
1012			block.justifications,
1013			block.body,
1014		);
1015
1016		let hash = block.header.hash();
1017		let parent_hash = *block.header.parent_hash();
1018
1019		let number = block.header.number();
1020
1021		if should_skip_verification(&*self.client, &block) {
1022			return Ok(block);
1023		}
1024
1025		debug!(
1026			target: LOG_TARGET,
1027			"We have {:?} logs in this header",
1028			block.header.digest().logs().len()
1029		);
1030
1031		let slot_now = Slot::from_timestamp(Timestamp::current(), self.slot_duration);
1032
1033		let pre_digest = find_pre_digest::<Block>(&block.header)?;
1034		let (check_header, epoch_descriptor) = {
1035			let (epoch_descriptor, viable_epoch) = query_epoch_changes(
1036				&self.epoch_changes,
1037				self.client.as_ref(),
1038				&self.config,
1039				*number,
1040				pre_digest.slot(),
1041				parent_hash,
1042			)?;
1043
1044			// We add one to the current slot to allow for some small drift.
1045			// FIXME #1019 in the future, alter this queue to allow deferring of headers
1046			let v_params = verification::VerificationParams {
1047				header: block.header.clone(),
1048				pre_digest: Some(pre_digest),
1049				slot_now: slot_now + 1,
1050				epoch: viable_epoch.as_ref(),
1051			};
1052
1053			(verification::check_header::<Block>(v_params)?, epoch_descriptor)
1054		};
1055
1056		match check_header {
1057			CheckedHeader::Checked(pre_header, verified_info) => {
1058				trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
1059				telemetry!(
1060					self.telemetry;
1061					CONSENSUS_TRACE;
1062					"babe.checked_and_importing";
1063					"pre_header" => ?pre_header,
1064				);
1065
1066				block.header = pre_header;
1067				block.post_digests.push(verified_info.seal);
1068				block.insert_intermediate(
1069					INTERMEDIATE_KEY,
1070					BabeIntermediate::<Block> { epoch_descriptor },
1071				);
1072				block.post_hash = Some(hash);
1073
1074				Ok(block)
1075			},
1076			CheckedHeader::Deferred(a, b) => {
1077				debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
1078				telemetry!(
1079					self.telemetry;
1080					CONSENSUS_DEBUG;
1081					"babe.header_too_far_in_future";
1082					"hash" => ?hash, "a" => ?a, "b" => ?b
1083				);
1084				Err(Error::<Block>::TooFarInFuture(hash).into())
1085			},
1086		}
1087	}
1088}
1089
1090/// Verification for imported blocks is skipped in three cases:
1091/// 1. When importing blocks below the last finalized block during network initial synchronization.
1092/// 2. When importing whole state we don't calculate epoch descriptor, but rather read it from the
1093///    state after import. We also skip all verifications because there's no parent state and we
1094///    trust the sync module to verify that the state is correct and finalized.
1095/// 3. When importing warp sync blocks that have already been verified via warp sync proof.
1096fn should_skip_verification<B: BlockT>(
1097	client: &impl HeaderBackend<B>,
1098	block: &BlockImportParams<B>,
1099) -> bool {
1100	block.origin == BlockOrigin::WarpSync || block.with_state() || {
1101		let number = *block.header.number();
1102		let info = client.info();
1103		info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end)
1104	}
1105}
1106
1107/// A block-import handler for BABE.
1108///
1109/// This scans each imported block for epoch change signals. The signals are
1110/// tracked in a tree (of all forks), and the import logic validates all epoch
1111/// change transitions, i.e. whether a given epoch change is expected or whether
1112/// it is missing.
1113///
1114/// The epoch change tree should be pruned as blocks are finalized.
1115pub struct BabeBlockImport<Block: BlockT, Client, I, CIDP, SC> {
1116	inner: I,
1117	client: Arc<Client>,
1118	epoch_changes: SharedEpochChanges<Block, Epoch>,
1119	create_inherent_data_providers: CIDP,
1120	config: BabeConfiguration,
1121	// A [`SelectChain`] implementation.
1122	//
1123	// Used to determine the best block that should be used as basis when sending an equivocation
1124	// report.
1125	select_chain: SC,
1126	// The offchain transaction pool factory.
1127	//
1128	// Will be used when sending equivocation reports.
1129	offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1130}
1131
1132impl<Block: BlockT, I: Clone, Client, CIDP: Clone, SC: Clone> Clone
1133	for BabeBlockImport<Block, Client, I, CIDP, SC>
1134{
1135	fn clone(&self) -> Self {
1136		BabeBlockImport {
1137			inner: self.inner.clone(),
1138			client: self.client.clone(),
1139			epoch_changes: self.epoch_changes.clone(),
1140			config: self.config.clone(),
1141			create_inherent_data_providers: self.create_inherent_data_providers.clone(),
1142			select_chain: self.select_chain.clone(),
1143			offchain_tx_pool_factory: self.offchain_tx_pool_factory.clone(),
1144		}
1145	}
1146}
1147
1148impl<Block: BlockT, Client, I, CIDP, SC> BabeBlockImport<Block, Client, I, CIDP, SC> {
1149	fn new(
1150		client: Arc<Client>,
1151		epoch_changes: SharedEpochChanges<Block, Epoch>,
1152		block_import: I,
1153		config: BabeConfiguration,
1154		create_inherent_data_providers: CIDP,
1155		select_chain: SC,
1156		offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1157	) -> Self {
1158		BabeBlockImport {
1159			client,
1160			inner: block_import,
1161			epoch_changes,
1162			config,
1163			create_inherent_data_providers,
1164			select_chain,
1165			offchain_tx_pool_factory,
1166		}
1167	}
1168}
1169
1170impl<Block, Client, Inner, CIDP, SC> BabeBlockImport<Block, Client, Inner, CIDP, SC>
1171where
1172	Block: BlockT,
1173	Inner: BlockImport<Block> + Send + Sync,
1174	Inner::Error: Into<ConsensusError>,
1175	Client: HeaderBackend<Block>
1176		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1177		+ AuxStore
1178		+ ProvideRuntimeApi<Block>
1179		+ Send
1180		+ Sync,
1181	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1182	CIDP: CreateInherentDataProviders<Block, ()>,
1183	CIDP::InherentDataProviders: InherentDataProviderExt + Send,
1184	SC: sp_consensus::SelectChain<Block> + 'static,
1185{
1186	/// Import whole state after warp sync.
1187	// This function makes multiple transactions to the DB. If one of them fails we may
1188	// end up in an inconsistent state and have to resync.
1189	async fn import_state(
1190		&self,
1191		mut block: BlockImportParams<Block>,
1192	) -> Result<ImportResult, ConsensusError> {
1193		let hash = block.post_hash();
1194		let parent_hash = *block.header.parent_hash();
1195		let number = *block.header.number();
1196
1197		block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
1198		// Reset block weight.
1199		aux_schema::write_block_weight(hash, 0, |values| {
1200			block
1201				.auxiliary
1202				.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1203		});
1204
1205		// First make the client import the state.
1206		let import_result = self.inner.import_block(block).await;
1207		let aux = match import_result {
1208			Ok(ImportResult::Imported(aux)) => aux,
1209			Ok(r) => {
1210				return Err(ConsensusError::ClientImport(format!(
1211					"Unexpected import result: {:?}",
1212					r
1213				)))
1214			},
1215			Err(r) => return Err(r.into()),
1216		};
1217
1218		// Read epoch info from the imported state.
1219		let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
1220			ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1221		})?;
1222		let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
1223			ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1224		})?;
1225
1226		let mut epoch_changes = self.epoch_changes.shared_data_locked();
1227		epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
1228		aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1229			self.client.insert_aux(insert, [])
1230		})
1231		.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1232
1233		Ok(ImportResult::Imported(aux))
1234	}
1235
1236	/// Check the inherents and equivocations.
1237	async fn check_inherents_and_equivocations(
1238		&self,
1239		block: &mut BlockImportParams<Block>,
1240	) -> Result<(), ConsensusError> {
1241		if should_skip_verification(&*self.client, block) {
1242			return Ok(());
1243		}
1244
1245		let parent_hash = *block.header.parent_hash();
1246		let number = *block.header.number();
1247
1248		let create_inherent_data_providers = self
1249			.create_inherent_data_providers
1250			.create_inherent_data_providers(parent_hash, ())
1251			.await?;
1252
1253		let slot_now = create_inherent_data_providers.slot();
1254
1255		let babe_pre_digest = find_pre_digest::<Block>(&block.header)
1256			.map_err(|e| ConsensusError::Other(Box::new(e)))?;
1257		let slot = babe_pre_digest.slot();
1258
1259		// Check inherents.
1260		self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
1261			.await?;
1262
1263		// Check for equivocation and report it to the runtime if needed.
1264		let author = {
1265			let viable_epoch = query_epoch_changes(
1266				&self.epoch_changes,
1267				self.client.as_ref(),
1268				&self.config,
1269				number,
1270				slot,
1271				parent_hash,
1272			)
1273			.map_err(|e| ConsensusError::Other(babe_err(e).into()))?
1274			.1;
1275			match viable_epoch
1276				.as_ref()
1277				.authorities
1278				.get(babe_pre_digest.authority_index() as usize)
1279			{
1280				Some(author) => author.0.clone(),
1281				None => {
1282					return Err(ConsensusError::Other(Error::<Block>::SlotAuthorNotFound.into()))
1283				},
1284			}
1285		};
1286		if let Err(err) = self
1287			.check_and_report_equivocation(slot_now, slot, &block.header, &author, &block.origin)
1288			.await
1289		{
1290			warn!(
1291				target: LOG_TARGET,
1292				"Error checking/reporting BABE equivocation: {}", err
1293			);
1294		}
1295		Ok(())
1296	}
1297
1298	async fn check_inherents(
1299		&self,
1300		block: &mut BlockImportParams<Block>,
1301		at_hash: Block::Hash,
1302		slot: Slot,
1303		create_inherent_data_providers: CIDP::InherentDataProviders,
1304	) -> Result<(), ConsensusError> {
1305		if block.state_action.skip_execution_checks() {
1306			return Ok(());
1307		}
1308
1309		if let Some(inner_body) = block.body.take() {
1310			let new_block = Block::new(block.header.clone(), inner_body);
1311			// if the body is passed through and the block was executed,
1312			// we need to use the runtime to check that the internally-set
1313			// timestamp in the inherents actually matches the slot set in the seal.
1314			let mut inherent_data = create_inherent_data_providers
1315				.create_inherent_data()
1316				.await
1317				.map_err(|e| ConsensusError::Other(Box::new(e)))?;
1318			inherent_data.babe_replace_inherent_data(slot);
1319
1320			use sp_block_builder::CheckInherentsError;
1321
1322			sp_block_builder::check_inherents_with_data(
1323				self.client.clone(),
1324				at_hash,
1325				new_block.clone(),
1326				&create_inherent_data_providers,
1327				inherent_data,
1328			)
1329			.await
1330			.map_err(|e| {
1331				ConsensusError::Other(Box::new(match e {
1332					CheckInherentsError::CreateInherentData(e) => {
1333						Error::<Block>::CreateInherents(e)
1334					},
1335					CheckInherentsError::Client(e) => Error::RuntimeApi(e),
1336					CheckInherentsError::CheckInherents(e) => Error::CheckInherents(e),
1337					CheckInherentsError::CheckInherentsUnknownError(id) => {
1338						Error::CheckInherentsUnhandled(id)
1339					},
1340				}))
1341			})?;
1342			let (_, inner_body) = new_block.deconstruct();
1343			block.body = Some(inner_body);
1344		}
1345
1346		Ok(())
1347	}
1348
1349	async fn check_and_report_equivocation(
1350		&self,
1351		slot_now: Slot,
1352		slot: Slot,
1353		header: &Block::Header,
1354		author: &AuthorityId,
1355		origin: &BlockOrigin,
1356	) -> Result<(), Error<Block>> {
1357		// don't report any equivocations during initial sync
1358		// as they are most likely stale.
1359		if *origin == BlockOrigin::NetworkInitialSync {
1360			return Ok(());
1361		}
1362
1363		// check if authorship of this header is an equivocation and return a proof if so.
1364		let Some(equivocation_proof) =
1365			check_equivocation(&*self.client, slot_now, slot, header, author)
1366				.map_err(Error::Client)?
1367		else {
1368			return Ok(());
1369		};
1370
1371		info!(
1372			target: LOG_TARGET,
1373			"Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
1374			author,
1375			slot,
1376			equivocation_proof.first_header.hash(),
1377			equivocation_proof.second_header.hash(),
1378		);
1379
1380		// get the best block on which we will build and send the equivocation report.
1381		let best_hash = self
1382			.select_chain
1383			.best_chain()
1384			.await
1385			.map(|h| h.hash())
1386			.map_err(|e| Error::Client(e.into()))?;
1387
1388		// generate a key ownership proof. we start by trying to generate the
1389		// key ownership proof at the parent of the equivocating header, this
1390		// will make sure that proof generation is successful since it happens
1391		// during the on-going session (i.e. session keys are available in the
1392		// state to be able to generate the proof). this might fail if the
1393		// equivocation happens on the first block of the session, in which case
1394		// its parent would be on the previous session. if generation on the
1395		// parent header fails we try with best block as well.
1396		let generate_key_owner_proof = |at_hash: Block::Hash| {
1397			self.client
1398				.runtime_api()
1399				.generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
1400				.map_err(Error::RuntimeApi)
1401		};
1402
1403		let parent_hash = *header.parent_hash();
1404		let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
1405			Some(proof) => proof,
1406			None => match generate_key_owner_proof(best_hash)? {
1407				Some(proof) => proof,
1408				None => {
1409					debug!(
1410						target: LOG_TARGET,
1411						"Equivocation offender is not part of the authority set."
1412					);
1413					return Ok(());
1414				},
1415			},
1416		};
1417
1418		// submit equivocation report at best block.
1419		let mut runtime_api = self.client.runtime_api();
1420
1421		// Register the offchain tx pool to be able to use it from the runtime.
1422		runtime_api
1423			.register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
1424
1425		runtime_api
1426			.submit_report_equivocation_unsigned_extrinsic(
1427				best_hash,
1428				equivocation_proof,
1429				key_owner_proof,
1430			)
1431			.map_err(Error::RuntimeApi)?;
1432
1433		info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
1434
1435		Ok(())
1436	}
1437}
1438
1439#[async_trait::async_trait]
1440impl<Block, Client, Inner, CIDP, SC> BlockImport<Block>
1441	for BabeBlockImport<Block, Client, Inner, CIDP, SC>
1442where
1443	Block: BlockT,
1444	Inner: BlockImport<Block> + Send + Sync,
1445	Inner::Error: Into<ConsensusError>,
1446	Client: HeaderBackend<Block>
1447		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1448		+ AuxStore
1449		+ ProvideRuntimeApi<Block>
1450		+ Send
1451		+ Sync,
1452	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1453	CIDP: CreateInherentDataProviders<Block, ()>,
1454	CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
1455	SC: SelectChain<Block> + 'static,
1456{
1457	type Error = ConsensusError;
1458
1459	async fn import_block(
1460		&self,
1461		mut block: BlockImportParams<Block>,
1462	) -> Result<ImportResult, Self::Error> {
1463		let hash = block.post_hash();
1464		let parent_hash = *block.header.parent_hash();
1465		let number = *block.header.number();
1466		let info = self.client.info();
1467
1468		self.check_inherents_and_equivocations(&mut block).await?;
1469
1470		let block_status = self
1471			.client
1472			.status(hash)
1473			.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1474
1475		// Skip babe logic if block already in chain or importing blocks during initial sync,
1476		// otherwise the check for epoch changes will error because trying to re-import an
1477		// epoch change or because of missing epoch data in the tree, respectively.
1478		if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
1479			block_status == BlockStatus::InChain
1480		{
1481			// When re-importing existing block strip away intermediates.
1482			// In case of initial sync intermediates should not be present...
1483			let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
1484			block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1485			return self.inner.import_block(block).await.map_err(Into::into);
1486		}
1487
1488		if block.with_state() {
1489			return self.import_state(block).await;
1490		}
1491
1492		let pre_digest = find_pre_digest::<Block>(&block.header).expect(
1493			"valid babe headers must contain a predigest; header has been already verified; qed",
1494		);
1495		let slot = pre_digest.slot();
1496
1497		// If there's a pending epoch we'll save the previous epoch changes here
1498		// this way we can revert it if there's any error.
1499		let mut old_epoch_changes = None;
1500
1501		// Skip epoch change processing for warp synced blocks
1502		let epoch_changes = if block.origin != BlockOrigin::WarpSync {
1503			let parent_header = self
1504				.client
1505				.header(parent_hash)
1506				.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1507				.ok_or_else(|| {
1508					ConsensusError::ChainLookup(
1509						babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
1510					)
1511				})?;
1512
1513			let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
1514				"parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
1515				 been verified; qed",
1516			);
1517
1518			// make sure that slot number is strictly increasing
1519			if slot <= parent_slot {
1520				return Err(ConsensusError::ClientImport(
1521					babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
1522				));
1523			}
1524
1525			let mut epoch_changes = self.epoch_changes.shared_data_locked();
1526
1527			// check if there's any epoch change expected to happen at this slot.
1528			// `epoch` is the epoch to verify the block under, and `first_in_epoch` is true
1529			// if this is the first block in its chain for that epoch.
1530			//
1531			// also provides the total weight of the chain, including the imported block.
1532			let (epoch_descriptor, first_in_epoch, parent_weight) = {
1533				let parent_weight = if *parent_header.number() == Zero::zero() {
1534					0
1535				} else {
1536					aux_schema::load_block_weight(&*self.client, parent_hash)
1537						.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1538						.ok_or_else(|| {
1539							ConsensusError::ClientImport(
1540								babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
1541									.into(),
1542							)
1543						})?
1544				};
1545
1546				let intermediate =
1547					block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
1548
1549				let epoch_descriptor = intermediate.epoch_descriptor;
1550				let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
1551				(epoch_descriptor, first_in_epoch, parent_weight)
1552			};
1553
1554			let total_weight = parent_weight + pre_digest.added_weight();
1555
1556			// search for this all the time so we can reject unexpected announcements.
1557			let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
1558				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1559			let next_config_digest = find_next_config_digest::<Block>(&block.header)
1560				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1561
1562			match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
1563				(true, true, _) => {},
1564				(false, false, false) => {},
1565				(false, false, true) => {
1566					return Err(ConsensusError::ClientImport(
1567						babe_err(Error::<Block>::UnexpectedConfigChange).into(),
1568					))
1569				},
1570				(true, false, _) => {
1571					return Err(ConsensusError::ClientImport(
1572						babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
1573					))
1574				},
1575				(false, true, _) => {
1576					return Err(ConsensusError::ClientImport(
1577						babe_err(Error::<Block>::UnexpectedEpochChange).into(),
1578					))
1579				},
1580			}
1581
1582			if let Some(next_epoch_descriptor) = next_epoch_digest {
1583				old_epoch_changes = Some((*epoch_changes).clone());
1584
1585				let mut viable_epoch = epoch_changes
1586					.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
1587					.ok_or_else(|| {
1588						ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
1589					})?
1590					.into_cloned();
1591
1592				let epoch_config = next_config_digest
1593					.map(Into::into)
1594					.unwrap_or_else(|| viable_epoch.as_ref().config.clone());
1595
1596				// restrict info logging during initial sync to avoid spam
1597				let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
1598					log::Level::Debug
1599				} else {
1600					log::Level::Info
1601				};
1602
1603				if viable_epoch.as_ref().end_slot() <= slot {
1604					// Some epochs must have been skipped as our current slot fits outside the
1605					// current epoch. We will figure out which epoch it belongs to and we will
1606					// re-use the same data for that epoch.
1607					// Notice that we are only updating a local copy of the `Epoch`, this
1608					// makes it so that when we insert the next epoch into `EpochChanges` below
1609					// (after incrementing it), it will use the correct epoch index and start
1610					// slot. We do not update the original epoch that will be re-used
1611					// because there might be other forks (that we haven't imported) where
1612					// the epoch isn't skipped, and to import those forks we want to keep
1613					// the original epoch data. Not updating the original epoch works
1614					// because when we search the tree for which epoch to use for a given
1615					// slot, we will search in-depth with the predicate `epoch.start_slot
1616					// <= slot` which will still match correctly without updating
1617					// `start_slot` to the correct value as below.
1618					let epoch = viable_epoch.as_mut();
1619					let prev_index = epoch.epoch_index;
1620					*epoch = epoch.clone_for_slot(slot);
1621
1622					warn!(
1623						target: LOG_TARGET,
1624						"👶 Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
1625					);
1626				}
1627
1628				log!(
1629					target: LOG_TARGET,
1630					log_level,
1631					"👶 New epoch {} launching at block {} (block slot {} >= start slot {}).",
1632					viable_epoch.as_ref().epoch_index,
1633					hash,
1634					slot,
1635					viable_epoch.as_ref().start_slot,
1636				);
1637
1638				let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
1639
1640				log!(
1641					target: LOG_TARGET,
1642					log_level,
1643					"👶 Next epoch starts at slot {}",
1644					next_epoch.as_ref().start_slot,
1645				);
1646
1647				// prune the tree of epochs not part of the finalized chain or
1648				// that are not live anymore, and then track the given epoch change
1649				// in the tree.
1650				// NOTE: it is important that these operations are done in this
1651				// order, otherwise if pruning after import the `is_descendent_of`
1652				// used by pruning may not know about the block that is being
1653				// imported.
1654				let prune_and_import = || {
1655					prune_finalized(self.client.clone(), &mut epoch_changes)?;
1656
1657					epoch_changes
1658						.import(
1659							descendent_query(&*self.client),
1660							hash,
1661							number,
1662							*block.header.parent_hash(),
1663							next_epoch,
1664						)
1665						.map_err(|e| {
1666							ConsensusError::ClientImport(format!(
1667								"Error importing epoch changes: {}",
1668								e
1669							))
1670						})?;
1671					Ok(())
1672				};
1673
1674				if let Err(e) = prune_and_import() {
1675					debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
1676					*epoch_changes =
1677						old_epoch_changes.expect("set `Some` above and not taken; qed");
1678					return Err(e);
1679				}
1680
1681				crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1682					block
1683						.auxiliary
1684						.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1685				});
1686			}
1687
1688			aux_schema::write_block_weight(hash, total_weight, |values| {
1689				block
1690					.auxiliary
1691					.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1692			});
1693
1694			// The fork choice rule is that we pick the heaviest chain (i.e.
1695			// more primary blocks), if there's a tie we go with the longest
1696			// chain.
1697			block.fork_choice = {
1698				let (last_best, last_best_number) = (info.best_hash, info.best_number);
1699
1700				let last_best_weight = if &last_best == block.header.parent_hash() {
1701					// the parent=genesis case is already covered for loading parent weight,
1702					// so we don't need to cover again here.
1703					parent_weight
1704				} else {
1705					aux_schema::load_block_weight(&*self.client, last_best)
1706						.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1707						.ok_or_else(|| {
1708							ConsensusError::ChainLookup(
1709								"No block weight for parent header.".to_string(),
1710							)
1711						})?
1712				};
1713
1714				Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
1715					true
1716				} else if total_weight == last_best_weight {
1717					number > last_best_number
1718				} else {
1719					false
1720				}))
1721			};
1722
1723			// Release the mutex, but it stays locked
1724			Some(epoch_changes.release_mutex())
1725		} else {
1726			block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1727			None
1728		};
1729
1730		let import_result = self.inner.import_block(block).await;
1731
1732		// revert to the original epoch changes in case there's an error
1733		// importing the block
1734		if import_result.is_err() {
1735			if let (Some(mut epoch_changes), Some(old_epoch_changes)) =
1736				(epoch_changes, old_epoch_changes)
1737			{
1738				*epoch_changes.upgrade() = old_epoch_changes;
1739			}
1740		}
1741
1742		import_result.map_err(Into::into)
1743	}
1744
1745	async fn check_block(
1746		&self,
1747		block: BlockCheckParams<Block>,
1748	) -> Result<ImportResult, Self::Error> {
1749		self.inner.check_block(block).await.map_err(Into::into)
1750	}
1751}
1752
1753/// Gets the best finalized block and its slot, and prunes the given epoch tree.
1754fn prune_finalized<Block, Client>(
1755	client: Arc<Client>,
1756	epoch_changes: &mut EpochChangesFor<Block, Epoch>,
1757) -> Result<(), ConsensusError>
1758where
1759	Block: BlockT,
1760	Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
1761{
1762	let info = client.info();
1763
1764	let finalized_slot = {
1765		let finalized_header = client
1766			.header(info.finalized_hash)
1767			.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1768			.expect(
1769				"best finalized hash was given by client; finalized headers must exist in db; qed",
1770			);
1771
1772		find_pre_digest::<Block>(&finalized_header)
1773			.expect("finalized header must be valid; valid blocks have a pre-digest; qed")
1774			.slot()
1775	};
1776
1777	epoch_changes
1778		.prune_finalized(
1779			descendent_query(&*client),
1780			&info.finalized_hash,
1781			info.finalized_number,
1782			finalized_slot,
1783		)
1784		.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1785
1786	Ok(())
1787}
1788
1789/// Produce a BABE block-import object to be used later on in the construction of
1790/// an import-queue.
1791///
1792/// Also returns a link object used to correctly instantiate the import queue
1793/// and background worker.
1794pub fn block_import<Client, Block: BlockT, I, CIDP, SC>(
1795	config: BabeConfiguration,
1796	wrapped_block_import: I,
1797	client: Arc<Client>,
1798	create_inherent_data_providers: CIDP,
1799	select_chain: SC,
1800	offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1801) -> ClientResult<(BabeBlockImport<Block, Client, I, CIDP, SC>, BabeLink<Block>)>
1802where
1803	Client: AuxStore
1804		+ HeaderBackend<Block>
1805		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1806		+ PreCommitActions<Block>
1807		+ 'static,
1808{
1809	let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
1810	let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
1811
1812	// NOTE: this isn't entirely necessary, but since we didn't use to prune the
1813	// epoch tree it is useful as a migration, so that nodes prune long trees on
1814	// startup rather than waiting until importing the next epoch change block.
1815	prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
1816
1817	let client_weak = Arc::downgrade(&client);
1818	let on_finality = move |summary: &FinalityNotification<Block>| {
1819		if let Some(client) = client_weak.upgrade() {
1820			aux_storage_cleanup(client.as_ref(), summary)
1821		} else {
1822			Default::default()
1823		}
1824	};
1825	client.register_finality_action(Box::new(on_finality));
1826
1827	let import = BabeBlockImport::new(
1828		client,
1829		epoch_changes,
1830		wrapped_block_import,
1831		config,
1832		create_inherent_data_providers,
1833		select_chain,
1834		offchain_tx_pool_factory,
1835	);
1836
1837	Ok((import, link))
1838}
1839
1840/// Parameters passed to [`import_queue`].
1841pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, Spawn> {
1842	/// The BABE link that is created by [`block_import`].
1843	pub link: BabeLink<Block>,
1844	/// The block import that should be wrapped.
1845	pub block_import: BI,
1846	/// Optional justification import.
1847	pub justification_import: Option<BoxJustificationImport<Block>>,
1848	/// The client to interact with the internals of the node.
1849	pub client: Arc<Client>,
1850	/// Slot duration.
1851	pub slot_duration: SlotDuration,
1852	/// Spawner for spawning futures.
1853	pub spawner: &'a Spawn,
1854	/// Registry for prometheus metrics.
1855	pub registry: Option<&'a Registry>,
1856	/// Optional telemetry handle to report telemetry events.
1857	pub telemetry: Option<TelemetryHandle>,
1858}
1859
1860/// Start an import queue for the BABE consensus algorithm.
1861///
1862/// This method returns the import queue, some data that needs to be passed to the block authoring
1863/// logic (`BabeLink`), and a future that must be run to
1864/// completion and is responsible for listening to finality notifications and
1865/// pruning the epoch changes tree.
1866///
1867/// The block import object provided must be the `BabeBlockImport` or a wrapper
1868/// of it, otherwise crucial import logic will be omitted.
1869pub fn import_queue<Block: BlockT, Client, BI, Spawn>(
1870	ImportQueueParams {
1871		link: babe_link,
1872		block_import,
1873		justification_import,
1874		client,
1875		slot_duration,
1876		spawner,
1877		registry,
1878		telemetry,
1879	}: ImportQueueParams<'_, Block, BI, Client, Spawn>,
1880) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
1881where
1882	BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1883	Client: ProvideRuntimeApi<Block>
1884		+ HeaderBackend<Block>
1885		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1886		+ AuxStore
1887		+ Send
1888		+ Sync
1889		+ 'static,
1890	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1891	Spawn: SpawnEssentialNamed,
1892{
1893	const HANDLE_BUFFER_SIZE: usize = 1024;
1894
1895	let verifier = BabeVerifier {
1896		slot_duration,
1897		config: babe_link.config.clone(),
1898		epoch_changes: babe_link.epoch_changes.clone(),
1899		telemetry,
1900		client: client.clone(),
1901	};
1902
1903	let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
1904
1905	let answer_requests =
1906		answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
1907
1908	spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
1909
1910	Ok((
1911		BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
1912		BabeWorkerHandle(worker_tx),
1913	))
1914}
1915
1916/// Reverts protocol aux data to at most the last finalized block.
1917/// In particular, epoch-changes and block weights announced after the revert
1918/// point are removed.
1919pub fn revert<Block, Client, Backend>(
1920	client: Arc<Client>,
1921	backend: Arc<Backend>,
1922	blocks: NumberFor<Block>,
1923) -> ClientResult<()>
1924where
1925	Block: BlockT,
1926	Client: AuxStore
1927		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1928		+ HeaderBackend<Block>
1929		+ ProvideRuntimeApi<Block>
1930		+ UsageProvider<Block>,
1931	Client::Api: BabeApi<Block>,
1932	Backend: BackendT<Block>,
1933{
1934	let best_number = client.info().best_number;
1935	let finalized = client.info().finalized_number;
1936
1937	let revertible = blocks.min(best_number - finalized);
1938	if revertible == Zero::zero() {
1939		return Ok(());
1940	}
1941
1942	let revert_up_to_number = best_number - revertible;
1943	let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
1944		format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
1945	))?;
1946
1947	// Revert epoch changes tree.
1948
1949	// This config is only used on-genesis.
1950	let config = configuration(&*client)?;
1951	let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
1952	let mut epoch_changes = epoch_changes.shared_data();
1953
1954	if revert_up_to_number == Zero::zero() {
1955		// Special case, no epoch changes data were present on genesis.
1956		*epoch_changes = EpochChangesFor::<Block, Epoch>::default();
1957	} else {
1958		epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
1959	}
1960
1961	// Remove block weights added after the revert point.
1962
1963	let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
1964
1965	let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
1966		sp_blockchain::tree_route(&*client, revert_up_to_hash, leaf)
1967			.map(|route| route.retracted().is_empty())
1968			.unwrap_or_default()
1969	});
1970
1971	for leaf in leaves {
1972		let mut hash = leaf;
1973		loop {
1974			let meta = client.header_metadata(hash)?;
1975			if meta.number <= revert_up_to_number ||
1976				!weight_keys.insert(aux_schema::block_weight_key(hash))
1977			{
1978				// We've reached the revert point or an already processed branch, stop here.
1979				break;
1980			}
1981			hash = meta.parent;
1982		}
1983	}
1984
1985	let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
1986
1987	// Write epoch changes and remove weights in one shot.
1988	aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
1989		client.insert_aux(values, weight_keys.iter())
1990	})
1991}
1992
1993fn query_epoch_changes<Block, Client>(
1994	epoch_changes: &SharedEpochChanges<Block, Epoch>,
1995	client: &Client,
1996	config: &BabeConfiguration,
1997	block_number: NumberFor<Block>,
1998	slot: Slot,
1999	parent_hash: Block::Hash,
2000) -> Result<
2001	(ViableEpochDescriptor<Block::Hash, NumberFor<Block>, Epoch>, ViableEpoch<Epoch>),
2002	Error<Block>,
2003>
2004where
2005	Block: BlockT,
2006	Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
2007{
2008	let epoch_changes = epoch_changes.shared_data();
2009	let epoch_descriptor = epoch_changes
2010		.epoch_descriptor_for_child_of(
2011			descendent_query(client),
2012			&parent_hash,
2013			block_number - 1u32.into(),
2014			slot,
2015		)
2016		.map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
2017		.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2018	let viable_epoch = epoch_changes
2019		.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
2020		.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2021	Ok((epoch_descriptor, viable_epoch.into_cloned()))
2022}