sc_consensus_babe/
lib.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! # BABE (Blind Assignment for Blockchain Extension)
20//!
21//! BABE is a slot-based block production mechanism which uses a VRF PRNG to
22//! randomly perform the slot allocation. On every slot, all the authorities
23//! generate a new random number with the VRF function and if it is lower than a
24//! given threshold (which is proportional to their weight/stake) they have a
25//! right to produce a block. The proof of the VRF function execution will be
26//! used by other peer to validate the legitimacy of the slot claim.
27//!
28//! The engine is also responsible for collecting entropy on-chain which will be
29//! used to seed the given VRF PRNG. An epoch is a contiguous number of slots
30//! under which we will be using the same authority set. During an epoch all VRF
31//! outputs produced as a result of block production will be collected on an
32//! on-chain randomness pool. Epoch changes are announced one epoch in advance,
33//! i.e. when ending epoch N, we announce the parameters (randomness,
34//! authorities, etc.) for epoch N+2.
35//!
36//! Since the slot assignment is randomized, it is possible that a slot is
37//! assigned to multiple validators in which case we will have a temporary fork,
38//! or that a slot is assigned to no validator in which case no block is
39//! produced. Which means that block times are not deterministic.
40//!
41//! The protocol has a parameter `c` [0, 1] for which `1 - c` is the probability
42//! of a slot being empty. The choice of this parameter affects the security of
43//! the protocol relating to maximum tolerable network delays.
44//!
45//! In addition to the VRF-based slot assignment described above, which we will
46//! call primary slots, the engine also supports a deterministic secondary slot
47//! assignment. Primary slots take precedence over secondary slots, when
48//! authoring the node starts by trying to claim a primary slot and falls back
49//! to a secondary slot claim attempt. The secondary slot assignment is done
50//! by picking the authority at index:
51//!
52//! `blake2_256(epoch_randomness ++ slot_number) % authorities_len`.
53//!
54//! The secondary slots supports either a `SecondaryPlain` or `SecondaryVRF`
55//! variant. Comparing with `SecondaryPlain` variant, the `SecondaryVRF` variant
56//! generates an additional VRF output. The output is not included in beacon
57//! randomness, but can be consumed by parachains.
58//!
59//! The fork choice rule is weight-based, where weight equals the number of
60//! primary blocks in the chain. We will pick the heaviest chain (more primary
61//! blocks) and will go with the longest one in case of a tie.
62//!
63//! An in-depth description and analysis of the protocol can be found here:
64//! <https://research.web3.foundation/Polkadot/protocols/block-production/Babe>
65
66#![forbid(unsafe_code)]
67#![warn(missing_docs)]
68
69use std::{
70	collections::HashSet,
71	future::Future,
72	ops::{Deref, DerefMut},
73	pin::Pin,
74	sync::Arc,
75	task::{Context, Poll},
76	time::Duration,
77};
78
79use codec::{Decode, Encode};
80use futures::{
81	channel::{
82		mpsc::{channel, Receiver, Sender},
83		oneshot,
84	},
85	prelude::*,
86};
87use log::{debug, info, log, trace, warn};
88use parking_lot::Mutex;
89use prometheus_endpoint::Registry;
90
91use sc_client_api::{
92	backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
93	PreCommitActions, UsageProvider,
94};
95use sc_consensus::{
96	block_import::{
97		BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
98		StateAction,
99	},
100	import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
101};
102use sc_consensus_epochs::{
103	descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpoch,
104	ViableEpochDescriptor,
105};
106use sc_consensus_slots::{
107	check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
108	SlotInfo, StorageChanges,
109};
110use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
111use sc_transaction_pool_api::OffchainTransactionPoolFactory;
112use sp_api::{ApiExt, ProvideRuntimeApi};
113use sp_application_crypto::AppCrypto;
114use sp_block_builder::BlockBuilder as BlockBuilderApi;
115use sp_blockchain::{
116	Backend as _, BlockStatus, Error as ClientError, HeaderBackend, HeaderMetadata,
117	Result as ClientResult,
118};
119use sp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain};
120use sp_consensus_babe::{inherents::BabeInherentData, SlotDuration};
121use sp_consensus_slots::Slot;
122use sp_core::traits::SpawnEssentialNamed;
123use sp_inherents::{CreateInherentDataProviders, InherentDataProvider};
124use sp_keystore::KeystorePtr;
125use sp_runtime::{
126	generic::OpaqueDigestItemId,
127	traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
128	DigestItem,
129};
130
131pub use sc_consensus_slots::SlotProportion;
132pub use sp_consensus::SyncOracle;
133pub use sp_consensus_babe::{
134	digests::{
135		CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
136		PrimaryPreDigest, SecondaryPlainPreDigest,
137	},
138	AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
139	BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
140};
141
142pub use aux_schema::load_block_weight as block_weight;
143use sp_timestamp::Timestamp;
144
145mod migration;
146mod verification;
147
148pub mod authorship;
149pub mod aux_schema;
150#[cfg(test)]
151mod tests;
152
153const LOG_TARGET: &str = "babe";
154
155/// VRF context used for slots claiming lottery.
156const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"substrate-babe-vrf";
157
158/// VRF output length for slots claiming lottery.
159const AUTHORING_SCORE_LENGTH: usize = 16;
160
161/// BABE epoch information
162#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
163pub struct Epoch(sp_consensus_babe::Epoch);
164
165impl Deref for Epoch {
166	type Target = sp_consensus_babe::Epoch;
167
168	fn deref(&self) -> &Self::Target {
169		&self.0
170	}
171}
172
173impl DerefMut for Epoch {
174	fn deref_mut(&mut self) -> &mut Self::Target {
175		&mut self.0
176	}
177}
178
179impl From<sp_consensus_babe::Epoch> for Epoch {
180	fn from(epoch: sp_consensus_babe::Epoch) -> Self {
181		Epoch(epoch)
182	}
183}
184
185impl EpochT for Epoch {
186	type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
187	type Slot = Slot;
188
189	fn increment(
190		&self,
191		(descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
192	) -> Epoch {
193		sp_consensus_babe::Epoch {
194			epoch_index: self.epoch_index + 1,
195			start_slot: self.start_slot + self.duration,
196			duration: self.duration,
197			authorities: descriptor.authorities,
198			randomness: descriptor.randomness,
199			config,
200		}
201		.into()
202	}
203
204	fn start_slot(&self) -> Slot {
205		self.start_slot
206	}
207
208	fn end_slot(&self) -> Slot {
209		self.start_slot + self.duration
210	}
211}
212
213impl Epoch {
214	/// Create the genesis epoch (epoch #0).
215	///
216	/// This is defined to start at the slot of the first block, so that has to be provided.
217	pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
218		sp_consensus_babe::Epoch {
219			epoch_index: 0,
220			start_slot: slot,
221			duration: genesis_config.epoch_length,
222			authorities: genesis_config.authorities.clone(),
223			randomness: genesis_config.randomness,
224			config: BabeEpochConfiguration {
225				c: genesis_config.c,
226				allowed_slots: genesis_config.allowed_slots,
227			},
228		}
229		.into()
230	}
231
232	/// Clone and tweak epoch information to refer to the specified slot.
233	///
234	/// All the information which depends on the slot value is recomputed and assigned
235	/// to the returned epoch instance.
236	///
237	/// The `slot` must be greater than or equal the original epoch start slot,
238	/// if is less this operation is equivalent to a simple clone.
239	pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
240		let mut epoch = self.clone();
241
242		let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
243
244		let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
245			"epoch number is u64; it should be strictly smaller than number of slots; \
246				slots relate in some way to wall clock time; \
247				if u64 is not enough we should crash for safety; qed.",
248		);
249
250		let start_slot = skipped_epochs
251			.checked_mul(epoch.duration)
252			.and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
253			.expect(
254				"slot number is u64; it should relate in some way to wall clock time; \
255				 if u64 is not enough we should crash for safety; qed.",
256			);
257
258		epoch.epoch_index = epoch_index;
259		epoch.start_slot = Slot::from(start_slot);
260
261		epoch
262	}
263}
264
265/// Errors encountered by the babe authorship task.
266#[derive(Debug, thiserror::Error)]
267pub enum Error<B: BlockT> {
268	/// Multiple BABE pre-runtime digests
269	#[error("Multiple BABE pre-runtime digests, rejecting!")]
270	MultiplePreRuntimeDigests,
271	/// No BABE pre-runtime digest found
272	#[error("No BABE pre-runtime digest found")]
273	NoPreRuntimeDigest,
274	/// Multiple BABE epoch change digests
275	#[error("Multiple BABE epoch change digests, rejecting!")]
276	MultipleEpochChangeDigests,
277	/// Multiple BABE config change digests
278	#[error("Multiple BABE config change digests, rejecting!")]
279	MultipleConfigChangeDigests,
280	/// Could not extract timestamp and slot
281	#[error("Could not extract timestamp and slot: {0}")]
282	Extraction(ConsensusError),
283	/// Could not fetch epoch
284	#[error("Could not fetch epoch at {0:?}")]
285	FetchEpoch(B::Hash),
286	/// Header rejected: too far in the future
287	#[error("Header {0:?} rejected: too far in the future")]
288	TooFarInFuture(B::Hash),
289	/// Parent unavailable. Cannot import
290	#[error("Parent ({0}) of {1} unavailable. Cannot import")]
291	ParentUnavailable(B::Hash, B::Hash),
292	/// Slot number must increase
293	#[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
294	SlotMustIncrease(Slot, Slot),
295	/// Header has a bad seal
296	#[error("Header {0:?} has a bad seal")]
297	HeaderBadSeal(B::Hash),
298	/// Header is unsealed
299	#[error("Header {0:?} is unsealed")]
300	HeaderUnsealed(B::Hash),
301	/// Slot author not found
302	#[error("Slot author not found")]
303	SlotAuthorNotFound,
304	/// Secondary slot assignments are disabled for the current epoch.
305	#[error("Secondary slot assignments are disabled for the current epoch.")]
306	SecondarySlotAssignmentsDisabled,
307	/// Bad signature
308	#[error("Bad signature on {0:?}")]
309	BadSignature(B::Hash),
310	/// Invalid author: Expected secondary author
311	#[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
312	InvalidAuthor(AuthorityId, AuthorityId),
313	/// No secondary author expected.
314	#[error("No secondary author expected.")]
315	NoSecondaryAuthorExpected,
316	/// VRF verification failed
317	#[error("VRF verification failed")]
318	VrfVerificationFailed,
319	/// Primary slot threshold too low
320	#[error("VRF output rejected, threshold {0} exceeded")]
321	VrfThresholdExceeded(u128),
322	/// Could not fetch parent header
323	#[error("Could not fetch parent header: {0}")]
324	FetchParentHeader(sp_blockchain::Error),
325	/// Expected epoch change to happen.
326	#[error("Expected epoch change to happen at {0:?}, s{1}")]
327	ExpectedEpochChange(B::Hash, Slot),
328	/// Unexpected config change.
329	#[error("Unexpected config change")]
330	UnexpectedConfigChange,
331	/// Unexpected epoch change
332	#[error("Unexpected epoch change")]
333	UnexpectedEpochChange,
334	/// Parent block has no associated weight
335	#[error("Parent block of {0} has no associated weight")]
336	ParentBlockNoAssociatedWeight(B::Hash),
337	/// Check inherents error
338	#[error("Checking inherents failed: {0}")]
339	CheckInherents(sp_inherents::Error),
340	/// Unhandled check inherents error
341	#[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
342	CheckInherentsUnhandled(sp_inherents::InherentIdentifier),
343	/// Create inherents error.
344	#[error("Creating inherents failed: {0}")]
345	CreateInherents(sp_inherents::Error),
346	/// Background worker is not running and therefore requests cannot be answered.
347	#[error("Background worker is not running")]
348	BackgroundWorkerTerminated,
349	/// Client error
350	#[error(transparent)]
351	Client(sp_blockchain::Error),
352	/// Runtime Api error.
353	#[error(transparent)]
354	RuntimeApi(sp_api::ApiError),
355	/// Fork tree error
356	#[error(transparent)]
357	ForkTree(Box<fork_tree::Error<sp_blockchain::Error>>),
358}
359
360impl<B: BlockT> From<Error<B>> for String {
361	fn from(error: Error<B>) -> String {
362		error.to_string()
363	}
364}
365
366fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
367	debug!(target: LOG_TARGET, "{}", error);
368	error
369}
370
371/// Intermediate value passed to block importer.
372pub struct BabeIntermediate<B: BlockT> {
373	/// The epoch descriptor.
374	pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
375}
376
377/// Intermediate key for Babe engine.
378pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
379
380/// Read configuration from the runtime state at current best block.
381pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
382where
383	C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
384	C::Api: BabeApi<B>,
385{
386	let at_hash = if client.usage_info().chain.finalized_state.is_some() {
387		client.usage_info().chain.best_hash
388	} else {
389		debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
390		client.usage_info().chain.genesis_hash
391	};
392
393	let runtime_api = client.runtime_api();
394	let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
395
396	let config = match version {
397		Some(1) => {
398			#[allow(deprecated)]
399			{
400				runtime_api.configuration_before_version_2(at_hash)?.into()
401			}
402		},
403		Some(2) => runtime_api.configuration(at_hash)?,
404		_ =>
405			return Err(sp_blockchain::Error::VersionInvalid(
406				"Unsupported or invalid BabeApi version".to_string(),
407			)),
408	};
409	Ok(config)
410}
411
412/// Parameters for BABE.
413pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
414	/// The keystore that manages the keys of the node.
415	pub keystore: KeystorePtr,
416
417	/// The client to use
418	pub client: Arc<C>,
419
420	/// The SelectChain Strategy
421	pub select_chain: SC,
422
423	/// The environment we are producing blocks for.
424	pub env: E,
425
426	/// The underlying block-import object to supply our produced blocks to.
427	/// This must be a `BabeBlockImport` or a wrapper of it, otherwise
428	/// critical consensus logic will be omitted.
429	pub block_import: I,
430
431	/// A sync oracle
432	pub sync_oracle: SO,
433
434	/// Hook into the sync module to control the justification sync process.
435	pub justification_sync_link: L,
436
437	/// Something that can create the inherent data providers.
438	pub create_inherent_data_providers: CIDP,
439
440	/// Force authoring of blocks even if we are offline
441	pub force_authoring: bool,
442
443	/// Strategy and parameters for backing off block production.
444	pub backoff_authoring_blocks: Option<BS>,
445
446	/// The source of timestamps for relative slots
447	pub babe_link: BabeLink<B>,
448
449	/// The proportion of the slot dedicated to proposing.
450	///
451	/// The block proposing will be limited to this proportion of the slot from the starting of the
452	/// slot. However, the proposing can still take longer when there is some lenience factor
453	/// applied, because there were no blocks produced for some slots.
454	pub block_proposal_slot_portion: SlotProportion,
455
456	/// The maximum proportion of the slot dedicated to proposing with any lenience factor applied
457	/// due to no blocks being produced.
458	pub max_block_proposal_slot_portion: Option<SlotProportion>,
459
460	/// Handle use to report telemetries.
461	pub telemetry: Option<TelemetryHandle>,
462}
463
464/// Start the babe worker.
465pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
466	BabeParams {
467		keystore,
468		client,
469		select_chain,
470		env,
471		block_import,
472		sync_oracle,
473		justification_sync_link,
474		create_inherent_data_providers,
475		force_authoring,
476		backoff_authoring_blocks,
477		babe_link,
478		block_proposal_slot_portion,
479		max_block_proposal_slot_portion,
480		telemetry,
481	}: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
482) -> Result<BabeWorker<B>, ConsensusError>
483where
484	B: BlockT,
485	C: ProvideRuntimeApi<B>
486		+ HeaderBackend<B>
487		+ HeaderMetadata<B, Error = ClientError>
488		+ Send
489		+ Sync
490		+ 'static,
491	C::Api: BabeApi<B>,
492	SC: SelectChain<B> + 'static,
493	E: Environment<B, Error = Error> + Send + Sync + 'static,
494	E::Proposer: Proposer<B, Error = Error>,
495	I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
496	SO: SyncOracle + Send + Sync + Clone + 'static,
497	L: sc_consensus::JustificationSyncLink<B> + 'static,
498	CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
499	CIDP::InherentDataProviders: InherentDataProviderExt + Send,
500	BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
501	Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
502{
503	let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
504
505	let worker = BabeSlotWorker {
506		client: client.clone(),
507		block_import,
508		env,
509		sync_oracle: sync_oracle.clone(),
510		justification_sync_link,
511		force_authoring,
512		backoff_authoring_blocks,
513		keystore,
514		epoch_changes: babe_link.epoch_changes.clone(),
515		slot_notification_sinks: slot_notification_sinks.clone(),
516		config: babe_link.config.clone(),
517		block_proposal_slot_portion,
518		max_block_proposal_slot_portion,
519		telemetry,
520	};
521
522	info!(target: LOG_TARGET, "👶 Starting BABE Authorship worker");
523
524	let slot_worker = sc_consensus_slots::start_slot_worker(
525		babe_link.config.slot_duration(),
526		select_chain,
527		sc_consensus_slots::SimpleSlotWorkerToSlotWorker(worker),
528		sync_oracle,
529		create_inherent_data_providers,
530	);
531
532	Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
533}
534
535// Remove obsolete block's weight data by leveraging finality notifications.
536// This includes data for all finalized blocks (excluding the most recent one)
537// and all stale branches.
538fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
539	client: &C,
540	notification: &FinalityNotification<Block>,
541) -> AuxDataOperations {
542	let mut hashes = HashSet::new();
543
544	let first = notification.tree_route.first().unwrap_or(&notification.hash);
545	match client.header_metadata(*first) {
546		Ok(meta) => {
547			hashes.insert(meta.parent);
548		},
549		Err(err) => {
550			warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
551		},
552	}
553
554	// Cleans data for finalized block's ancestors
555	hashes.extend(
556		notification
557			.tree_route
558			.iter()
559			// Ensure we don't prune latest finalized block.
560			// This should not happen, but better be safe than sorry!
561			.filter(|h| **h != notification.hash),
562	);
563
564	hashes.extend(notification.stale_blocks.iter().map(|b| b.hash));
565
566	hashes
567		.into_iter()
568		.map(|val| (aux_schema::block_weight_key(val), None))
569		.collect()
570}
571
572async fn answer_requests<B: BlockT, C>(
573	mut request_rx: Receiver<BabeRequest<B>>,
574	config: BabeConfiguration,
575	client: Arc<C>,
576	epoch_changes: SharedEpochChanges<B, Epoch>,
577) where
578	C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
579{
580	while let Some(request) = request_rx.next().await {
581		match request {
582			BabeRequest::EpochData(response) => {
583				let _ = response.send(epoch_changes.shared_data().clone());
584			},
585			BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
586				let lookup = || {
587					let epoch_changes = epoch_changes.shared_data();
588					epoch_changes
589						.epoch_data_for_child_of(
590							descendent_query(&*client),
591							&parent_hash,
592							parent_number,
593							slot,
594							|slot| Epoch::genesis(&config, slot),
595						)
596						.map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
597						.ok_or(Error::<B>::FetchEpoch(parent_hash))
598				};
599
600				let _ = response.send(lookup());
601			},
602		}
603	}
604}
605
606/// Requests to the BABE service.
607enum BabeRequest<B: BlockT> {
608	/// Request all available epoch data.
609	EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
610	/// Request the epoch that a child of the given block, with the given slot number would have.
611	///
612	/// The parent block is identified by its hash and number.
613	EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
614}
615
616/// A handle to the BABE worker for issuing requests.
617#[derive(Clone)]
618pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
619
620impl<B: BlockT> BabeWorkerHandle<B> {
621	async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
622		match self.0.clone().send(request).await {
623			Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
624			Err(err) => warn!(
625				target: LOG_TARGET,
626				"Unhandled error when sending request to worker: {:?}", err
627			),
628			_ => {},
629		}
630
631		Ok(())
632	}
633
634	/// Fetch all available epoch data.
635	pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
636		let (tx, rx) = oneshot::channel();
637		self.send_request(BabeRequest::EpochData(tx)).await?;
638
639		rx.await.or(Err(Error::BackgroundWorkerTerminated))
640	}
641
642	/// Fetch the epoch that a child of the given block, with the given slot number would have.
643	///
644	/// The parent block is identified by its hash and number.
645	pub async fn epoch_data_for_child_of(
646		&self,
647		parent_hash: B::Hash,
648		parent_number: NumberFor<B>,
649		slot: Slot,
650	) -> Result<Epoch, Error<B>> {
651		let (tx, rx) = oneshot::channel();
652		self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
653			.await?;
654
655		rx.await.or(Err(Error::BackgroundWorkerTerminated))?
656	}
657}
658
659/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
660#[must_use]
661pub struct BabeWorker<B: BlockT> {
662	inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
663	slot_notification_sinks: SlotNotificationSinks<B>,
664}
665
666impl<B: BlockT> BabeWorker<B> {
667	/// Return an event stream of notifications for when new slot happens, and the corresponding
668	/// epoch descriptor.
669	pub fn slot_notification_stream(
670		&self,
671	) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
672		const CHANNEL_BUFFER_SIZE: usize = 1024;
673
674		let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
675		self.slot_notification_sinks.lock().push(sink);
676		stream
677	}
678}
679
680impl<B: BlockT> Future for BabeWorker<B> {
681	type Output = ();
682
683	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
684		self.inner.as_mut().poll(cx)
685	}
686}
687
688/// Slot notification sinks.
689type SlotNotificationSinks<B> = Arc<
690	Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
691>;
692
693struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
694	client: Arc<C>,
695	block_import: I,
696	env: E,
697	sync_oracle: SO,
698	justification_sync_link: L,
699	force_authoring: bool,
700	backoff_authoring_blocks: Option<BS>,
701	keystore: KeystorePtr,
702	epoch_changes: SharedEpochChanges<B, Epoch>,
703	slot_notification_sinks: SlotNotificationSinks<B>,
704	config: BabeConfiguration,
705	block_proposal_slot_portion: SlotProportion,
706	max_block_proposal_slot_portion: Option<SlotProportion>,
707	telemetry: Option<TelemetryHandle>,
708}
709
710#[async_trait::async_trait]
711impl<B, C, E, I, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
712	for BabeSlotWorker<B, C, E, I, SO, L, BS>
713where
714	B: BlockT,
715	C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
716	C::Api: BabeApi<B>,
717	E: Environment<B, Error = Error> + Send + Sync,
718	E::Proposer: Proposer<B, Error = Error>,
719	I: BlockImport<B> + Send + Sync + 'static,
720	SO: SyncOracle + Send + Clone + Sync,
721	L: sc_consensus::JustificationSyncLink<B>,
722	BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
723	Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
724{
725	type Claim = (PreDigest, AuthorityId);
726	type SyncOracle = SO;
727	type JustificationSyncLink = L;
728	type CreateProposer =
729		Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
730	type Proposer = E::Proposer;
731	type BlockImport = I;
732	type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
733
734	fn logging_target(&self) -> &'static str {
735		LOG_TARGET
736	}
737
738	fn block_import(&mut self) -> &mut Self::BlockImport {
739		&mut self.block_import
740	}
741
742	fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
743		self.epoch_changes
744			.shared_data()
745			.epoch_descriptor_for_child_of(
746				descendent_query(&*self.client),
747				&parent.hash(),
748				*parent.number(),
749				slot,
750			)
751			.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
752			.ok_or(ConsensusError::InvalidAuthoritiesSet)
753	}
754
755	fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
756		self.epoch_changes
757			.shared_data()
758			.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
759			.map(|epoch| epoch.as_ref().authorities.len())
760	}
761
762	async fn claim_slot(
763		&mut self,
764		_parent_header: &B::Header,
765		slot: Slot,
766		epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
767	) -> Option<Self::Claim> {
768		debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
769		let s = authorship::claim_slot(
770			slot,
771			self.epoch_changes
772				.shared_data()
773				.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
774				.as_ref(),
775			&self.keystore,
776		);
777
778		if s.is_some() {
779			debug!(target: LOG_TARGET, "Claimed slot {}", slot);
780		}
781
782		s
783	}
784
785	fn notify_slot(
786		&self,
787		_parent_header: &B::Header,
788		slot: Slot,
789		epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
790	) {
791		let sinks = &mut self.slot_notification_sinks.lock();
792		sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
793			Ok(()) => true,
794			Err(e) =>
795				if e.is_full() {
796					warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
797					true
798				} else {
799					false
800				},
801		});
802	}
803
804	fn pre_digest_data(&self, _slot: Slot, claim: &Self::Claim) -> Vec<sp_runtime::DigestItem> {
805		vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
806	}
807
808	async fn block_import_params(
809		&self,
810		header: B::Header,
811		header_hash: &B::Hash,
812		body: Vec<B::Extrinsic>,
813		storage_changes: StorageChanges<B>,
814		(_, public): Self::Claim,
815		epoch_descriptor: Self::AuxData,
816	) -> Result<BlockImportParams<B>, ConsensusError> {
817		let signature = self
818			.keystore
819			.sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
820			.map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
821			.ok_or_else(|| {
822				ConsensusError::CannotSign(format!(
823					"Could not find key in keystore. Key: {:?}",
824					public
825				))
826			})?;
827
828		let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
829
830		let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
831		import_block.post_digests.push(digest_item);
832		import_block.body = Some(body);
833		import_block.state_action =
834			StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
835		import_block
836			.insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
837
838		Ok(import_block)
839	}
840
841	fn force_authoring(&self) -> bool {
842		self.force_authoring
843	}
844
845	fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
846		if let Some(ref strategy) = self.backoff_authoring_blocks {
847			if let Ok(chain_head_slot) =
848				find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
849			{
850				return strategy.should_backoff(
851					*chain_head.number(),
852					chain_head_slot,
853					self.client.info().finalized_number,
854					slot,
855					self.logging_target(),
856				)
857			}
858		}
859		false
860	}
861
862	fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
863		&mut self.sync_oracle
864	}
865
866	fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
867		&mut self.justification_sync_link
868	}
869
870	fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
871		Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
872	}
873
874	fn telemetry(&self) -> Option<TelemetryHandle> {
875		self.telemetry.clone()
876	}
877
878	fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
879		let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
880
881		sc_consensus_slots::proposing_remaining_duration(
882			parent_slot,
883			slot_info,
884			&self.block_proposal_slot_portion,
885			self.max_block_proposal_slot_portion.as_ref(),
886			sc_consensus_slots::SlotLenienceType::Exponential,
887			self.logging_target(),
888		)
889	}
890}
891
892/// Extract the BABE pre digest from the given header. Pre-runtime digests are
893/// mandatory, the function will return `Err` if none is found.
894pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
895	// genesis block doesn't contain a pre digest so let's generate a
896	// dummy one to not break any invariants in the rest of the code
897	if header.number().is_zero() {
898		return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
899			slot: 0.into(),
900			authority_index: 0,
901		}))
902	}
903
904	let mut pre_digest: Option<_> = None;
905	for log in header.digest().logs() {
906		trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
907		match (log.as_babe_pre_digest(), pre_digest.is_some()) {
908			(Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
909			(None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
910			(s, false) => pre_digest = s,
911		}
912	}
913	pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
914}
915
916/// Check whether the given header contains a BABE epoch change digest.
917pub fn contains_epoch_change<B: BlockT>(header: &B::Header) -> bool {
918	find_next_epoch_digest::<B>(header).ok().flatten().is_some()
919}
920
921/// Extract the BABE epoch change digest from the given header, if it exists.
922pub fn find_next_epoch_digest<B: BlockT>(
923	header: &B::Header,
924) -> Result<Option<NextEpochDescriptor>, Error<B>> {
925	let mut epoch_digest: Option<_> = None;
926	for log in header.digest().logs() {
927		trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
928		let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
929		match (log, epoch_digest.is_some()) {
930			(Some(ConsensusLog::NextEpochData(_)), true) =>
931				return Err(babe_err(Error::MultipleEpochChangeDigests)),
932			(Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
933			_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
934		}
935	}
936
937	Ok(epoch_digest)
938}
939
940/// Extract the BABE config change digest from the given header, if it exists.
941fn find_next_config_digest<B: BlockT>(
942	header: &B::Header,
943) -> Result<Option<NextConfigDescriptor>, Error<B>> {
944	let mut config_digest: Option<_> = None;
945	for log in header.digest().logs() {
946		trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
947		let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
948		match (log, config_digest.is_some()) {
949			(Some(ConsensusLog::NextConfigData(_)), true) =>
950				return Err(babe_err(Error::MultipleConfigChangeDigests)),
951			(Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
952			_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
953		}
954	}
955
956	Ok(config_digest)
957}
958
959/// State that must be shared between the import queue and the authoring logic.
960#[derive(Clone)]
961pub struct BabeLink<Block: BlockT> {
962	epoch_changes: SharedEpochChanges<Block, Epoch>,
963	config: BabeConfiguration,
964}
965
966impl<Block: BlockT> BabeLink<Block> {
967	/// Get the epoch changes of this link.
968	pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
969		&self.epoch_changes
970	}
971
972	/// Get the config of this link.
973	pub fn config(&self) -> &BabeConfiguration {
974		&self.config
975	}
976}
977
978/// A verifier for Babe blocks.
979pub struct BabeVerifier<Block: BlockT, Client> {
980	client: Arc<Client>,
981	slot_duration: SlotDuration,
982	config: BabeConfiguration,
983	epoch_changes: SharedEpochChanges<Block, Epoch>,
984	telemetry: Option<TelemetryHandle>,
985}
986
987#[async_trait::async_trait]
988impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client>
989where
990	Block: BlockT,
991	Client: HeaderMetadata<Block, Error = sp_blockchain::Error>
992		+ HeaderBackend<Block>
993		+ ProvideRuntimeApi<Block>
994		+ Send
995		+ Sync
996		+ AuxStore,
997	Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
998{
999	async fn verify(
1000		&self,
1001		mut block: BlockImportParams<Block>,
1002	) -> Result<BlockImportParams<Block>, String> {
1003		trace!(
1004			target: LOG_TARGET,
1005			"Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
1006			block.origin,
1007			block.header,
1008			block.justifications,
1009			block.body,
1010		);
1011
1012		let hash = block.header.hash();
1013		let parent_hash = *block.header.parent_hash();
1014
1015		let number = block.header.number();
1016
1017		if is_state_sync_or_gap_sync_import(&*self.client, &block) {
1018			return Ok(block)
1019		}
1020
1021		debug!(
1022			target: LOG_TARGET,
1023			"We have {:?} logs in this header",
1024			block.header.digest().logs().len()
1025		);
1026
1027		let slot_now = Slot::from_timestamp(Timestamp::current(), self.slot_duration);
1028
1029		let pre_digest = find_pre_digest::<Block>(&block.header)?;
1030		let (check_header, epoch_descriptor) = {
1031			let (epoch_descriptor, viable_epoch) = query_epoch_changes(
1032				&self.epoch_changes,
1033				self.client.as_ref(),
1034				&self.config,
1035				*number,
1036				pre_digest.slot(),
1037				parent_hash,
1038			)?;
1039
1040			// We add one to the current slot to allow for some small drift.
1041			// FIXME #1019 in the future, alter this queue to allow deferring of headers
1042			let v_params = verification::VerificationParams {
1043				header: block.header.clone(),
1044				pre_digest: Some(pre_digest),
1045				slot_now: slot_now + 1,
1046				epoch: viable_epoch.as_ref(),
1047			};
1048
1049			(verification::check_header::<Block>(v_params)?, epoch_descriptor)
1050		};
1051
1052		match check_header {
1053			CheckedHeader::Checked(pre_header, verified_info) => {
1054				trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
1055				telemetry!(
1056					self.telemetry;
1057					CONSENSUS_TRACE;
1058					"babe.checked_and_importing";
1059					"pre_header" => ?pre_header,
1060				);
1061
1062				block.header = pre_header;
1063				block.post_digests.push(verified_info.seal);
1064				block.insert_intermediate(
1065					INTERMEDIATE_KEY,
1066					BabeIntermediate::<Block> { epoch_descriptor },
1067				);
1068				block.post_hash = Some(hash);
1069
1070				Ok(block)
1071			},
1072			CheckedHeader::Deferred(a, b) => {
1073				debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
1074				telemetry!(
1075					self.telemetry;
1076					CONSENSUS_DEBUG;
1077					"babe.header_too_far_in_future";
1078					"hash" => ?hash, "a" => ?a, "b" => ?b
1079				);
1080				Err(Error::<Block>::TooFarInFuture(hash).into())
1081			},
1082		}
1083	}
1084}
1085
1086/// Verification for imported blocks is skipped in two cases:
1087/// 1. When importing blocks below the last finalized block during network initial synchronization.
1088/// 2. When importing whole state we don't calculate epoch descriptor, but rather read it from the
1089///    state after import. We also skip all verifications because there's no parent state and we
1090///    trust the sync module to verify that the state is correct and finalized.
1091fn is_state_sync_or_gap_sync_import<B: BlockT>(
1092	client: &impl HeaderBackend<B>,
1093	block: &BlockImportParams<B>,
1094) -> bool {
1095	let number = *block.header.number();
1096	let info = client.info();
1097	info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
1098		block.with_state()
1099}
1100
1101/// A block-import handler for BABE.
1102///
1103/// This scans each imported block for epoch change signals. The signals are
1104/// tracked in a tree (of all forks), and the import logic validates all epoch
1105/// change transitions, i.e. whether a given epoch change is expected or whether
1106/// it is missing.
1107///
1108/// The epoch change tree should be pruned as blocks are finalized.
1109pub struct BabeBlockImport<Block: BlockT, Client, I, CIDP, SC> {
1110	inner: I,
1111	client: Arc<Client>,
1112	epoch_changes: SharedEpochChanges<Block, Epoch>,
1113	create_inherent_data_providers: CIDP,
1114	config: BabeConfiguration,
1115	// A [`SelectChain`] implementation.
1116	//
1117	// Used to determine the best block that should be used as basis when sending an equivocation
1118	// report.
1119	select_chain: SC,
1120	// The offchain transaction pool factory.
1121	//
1122	// Will be used when sending equivocation reports.
1123	offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1124}
1125
1126impl<Block: BlockT, I: Clone, Client, CIDP: Clone, SC: Clone> Clone
1127	for BabeBlockImport<Block, Client, I, CIDP, SC>
1128{
1129	fn clone(&self) -> Self {
1130		BabeBlockImport {
1131			inner: self.inner.clone(),
1132			client: self.client.clone(),
1133			epoch_changes: self.epoch_changes.clone(),
1134			config: self.config.clone(),
1135			create_inherent_data_providers: self.create_inherent_data_providers.clone(),
1136			select_chain: self.select_chain.clone(),
1137			offchain_tx_pool_factory: self.offchain_tx_pool_factory.clone(),
1138		}
1139	}
1140}
1141
1142impl<Block: BlockT, Client, I, CIDP, SC> BabeBlockImport<Block, Client, I, CIDP, SC> {
1143	fn new(
1144		client: Arc<Client>,
1145		epoch_changes: SharedEpochChanges<Block, Epoch>,
1146		block_import: I,
1147		config: BabeConfiguration,
1148		create_inherent_data_providers: CIDP,
1149		select_chain: SC,
1150		offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1151	) -> Self {
1152		BabeBlockImport {
1153			client,
1154			inner: block_import,
1155			epoch_changes,
1156			config,
1157			create_inherent_data_providers,
1158			select_chain,
1159			offchain_tx_pool_factory,
1160		}
1161	}
1162}
1163
1164impl<Block, Client, Inner, CIDP, SC> BabeBlockImport<Block, Client, Inner, CIDP, SC>
1165where
1166	Block: BlockT,
1167	Inner: BlockImport<Block> + Send + Sync,
1168	Inner::Error: Into<ConsensusError>,
1169	Client: HeaderBackend<Block>
1170		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1171		+ AuxStore
1172		+ ProvideRuntimeApi<Block>
1173		+ Send
1174		+ Sync,
1175	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1176	CIDP: CreateInherentDataProviders<Block, ()>,
1177	CIDP::InherentDataProviders: InherentDataProviderExt + Send,
1178	SC: sp_consensus::SelectChain<Block> + 'static,
1179{
1180	/// Import whole state after warp sync.
1181	// This function makes multiple transactions to the DB. If one of them fails we may
1182	// end up in an inconsistent state and have to resync.
1183	async fn import_state(
1184		&self,
1185		mut block: BlockImportParams<Block>,
1186	) -> Result<ImportResult, ConsensusError> {
1187		let hash = block.post_hash();
1188		let parent_hash = *block.header.parent_hash();
1189		let number = *block.header.number();
1190
1191		block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
1192		// Reset block weight.
1193		aux_schema::write_block_weight(hash, 0, |values| {
1194			block
1195				.auxiliary
1196				.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1197		});
1198
1199		// First make the client import the state.
1200		let import_result = self.inner.import_block(block).await;
1201		let aux = match import_result {
1202			Ok(ImportResult::Imported(aux)) => aux,
1203			Ok(r) =>
1204				return Err(ConsensusError::ClientImport(format!(
1205					"Unexpected import result: {:?}",
1206					r
1207				))),
1208			Err(r) => return Err(r.into()),
1209		};
1210
1211		// Read epoch info from the imported state.
1212		let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
1213			ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1214		})?;
1215		let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
1216			ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1217		})?;
1218
1219		let mut epoch_changes = self.epoch_changes.shared_data_locked();
1220		epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
1221		aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1222			self.client.insert_aux(insert, [])
1223		})
1224		.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1225
1226		Ok(ImportResult::Imported(aux))
1227	}
1228
1229	/// Check the inherents and equivocations.
1230	async fn check_inherents_and_equivocations(
1231		&self,
1232		block: &mut BlockImportParams<Block>,
1233	) -> Result<(), ConsensusError> {
1234		if is_state_sync_or_gap_sync_import(&*self.client, block) {
1235			return Ok(())
1236		}
1237
1238		let parent_hash = *block.header.parent_hash();
1239		let number = *block.header.number();
1240
1241		let create_inherent_data_providers = self
1242			.create_inherent_data_providers
1243			.create_inherent_data_providers(parent_hash, ())
1244			.await?;
1245
1246		let slot_now = create_inherent_data_providers.slot();
1247
1248		let babe_pre_digest = find_pre_digest::<Block>(&block.header)
1249			.map_err(|e| ConsensusError::Other(Box::new(e)))?;
1250		let slot = babe_pre_digest.slot();
1251
1252		// Check inherents.
1253		self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
1254			.await?;
1255
1256		// Check for equivocation and report it to the runtime if needed.
1257		let author = {
1258			let viable_epoch = query_epoch_changes(
1259				&self.epoch_changes,
1260				self.client.as_ref(),
1261				&self.config,
1262				number,
1263				slot,
1264				parent_hash,
1265			)
1266			.map_err(|e| ConsensusError::Other(babe_err(e).into()))?
1267			.1;
1268			match viable_epoch
1269				.as_ref()
1270				.authorities
1271				.get(babe_pre_digest.authority_index() as usize)
1272			{
1273				Some(author) => author.0.clone(),
1274				None =>
1275					return Err(ConsensusError::Other(Error::<Block>::SlotAuthorNotFound.into())),
1276			}
1277		};
1278		if let Err(err) = self
1279			.check_and_report_equivocation(slot_now, slot, &block.header, &author, &block.origin)
1280			.await
1281		{
1282			warn!(
1283				target: LOG_TARGET,
1284				"Error checking/reporting BABE equivocation: {}", err
1285			);
1286		}
1287		Ok(())
1288	}
1289
1290	async fn check_inherents(
1291		&self,
1292		block: &mut BlockImportParams<Block>,
1293		at_hash: Block::Hash,
1294		slot: Slot,
1295		create_inherent_data_providers: CIDP::InherentDataProviders,
1296	) -> Result<(), ConsensusError> {
1297		if block.state_action.skip_execution_checks() {
1298			return Ok(())
1299		}
1300
1301		if let Some(inner_body) = block.body.take() {
1302			let new_block = Block::new(block.header.clone(), inner_body);
1303			// if the body is passed through and the block was executed,
1304			// we need to use the runtime to check that the internally-set
1305			// timestamp in the inherents actually matches the slot set in the seal.
1306			let mut inherent_data = create_inherent_data_providers
1307				.create_inherent_data()
1308				.await
1309				.map_err(|e| ConsensusError::Other(Box::new(e)))?;
1310			inherent_data.babe_replace_inherent_data(slot);
1311
1312			use sp_block_builder::CheckInherentsError;
1313
1314			sp_block_builder::check_inherents_with_data(
1315				self.client.clone(),
1316				at_hash,
1317				new_block.clone(),
1318				&create_inherent_data_providers,
1319				inherent_data,
1320			)
1321			.await
1322			.map_err(|e| {
1323				ConsensusError::Other(Box::new(match e {
1324					CheckInherentsError::CreateInherentData(e) =>
1325						Error::<Block>::CreateInherents(e),
1326					CheckInherentsError::Client(e) => Error::RuntimeApi(e),
1327					CheckInherentsError::CheckInherents(e) => Error::CheckInherents(e),
1328					CheckInherentsError::CheckInherentsUnknownError(id) =>
1329						Error::CheckInherentsUnhandled(id),
1330				}))
1331			})?;
1332			let (_, inner_body) = new_block.deconstruct();
1333			block.body = Some(inner_body);
1334		}
1335
1336		Ok(())
1337	}
1338
1339	async fn check_and_report_equivocation(
1340		&self,
1341		slot_now: Slot,
1342		slot: Slot,
1343		header: &Block::Header,
1344		author: &AuthorityId,
1345		origin: &BlockOrigin,
1346	) -> Result<(), Error<Block>> {
1347		// don't report any equivocations during initial sync
1348		// as they are most likely stale.
1349		if *origin == BlockOrigin::NetworkInitialSync {
1350			return Ok(())
1351		}
1352
1353		// check if authorship of this header is an equivocation and return a proof if so.
1354		let Some(equivocation_proof) =
1355			check_equivocation(&*self.client, slot_now, slot, header, author)
1356				.map_err(Error::Client)?
1357		else {
1358			return Ok(())
1359		};
1360
1361		info!(
1362			"Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
1363			author,
1364			slot,
1365			equivocation_proof.first_header.hash(),
1366			equivocation_proof.second_header.hash(),
1367		);
1368
1369		// get the best block on which we will build and send the equivocation report.
1370		let best_hash = self
1371			.select_chain
1372			.best_chain()
1373			.await
1374			.map(|h| h.hash())
1375			.map_err(|e| Error::Client(e.into()))?;
1376
1377		// generate a key ownership proof. we start by trying to generate the
1378		// key ownership proof at the parent of the equivocating header, this
1379		// will make sure that proof generation is successful since it happens
1380		// during the on-going session (i.e. session keys are available in the
1381		// state to be able to generate the proof). this might fail if the
1382		// equivocation happens on the first block of the session, in which case
1383		// its parent would be on the previous session. if generation on the
1384		// parent header fails we try with best block as well.
1385		let generate_key_owner_proof = |at_hash: Block::Hash| {
1386			self.client
1387				.runtime_api()
1388				.generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
1389				.map_err(Error::RuntimeApi)
1390		};
1391
1392		let parent_hash = *header.parent_hash();
1393		let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
1394			Some(proof) => proof,
1395			None => match generate_key_owner_proof(best_hash)? {
1396				Some(proof) => proof,
1397				None => {
1398					debug!(
1399						target: LOG_TARGET,
1400						"Equivocation offender is not part of the authority set."
1401					);
1402					return Ok(())
1403				},
1404			},
1405		};
1406
1407		// submit equivocation report at best block.
1408		let mut runtime_api = self.client.runtime_api();
1409
1410		// Register the offchain tx pool to be able to use it from the runtime.
1411		runtime_api
1412			.register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
1413
1414		runtime_api
1415			.submit_report_equivocation_unsigned_extrinsic(
1416				best_hash,
1417				equivocation_proof,
1418				key_owner_proof,
1419			)
1420			.map_err(Error::RuntimeApi)?;
1421
1422		info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
1423
1424		Ok(())
1425	}
1426}
1427
1428#[async_trait::async_trait]
1429impl<Block, Client, Inner, CIDP, SC> BlockImport<Block>
1430	for BabeBlockImport<Block, Client, Inner, CIDP, SC>
1431where
1432	Block: BlockT,
1433	Inner: BlockImport<Block> + Send + Sync,
1434	Inner::Error: Into<ConsensusError>,
1435	Client: HeaderBackend<Block>
1436		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1437		+ AuxStore
1438		+ ProvideRuntimeApi<Block>
1439		+ Send
1440		+ Sync,
1441	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1442	CIDP: CreateInherentDataProviders<Block, ()>,
1443	CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
1444	SC: SelectChain<Block> + 'static,
1445{
1446	type Error = ConsensusError;
1447
1448	async fn import_block(
1449		&self,
1450		mut block: BlockImportParams<Block>,
1451	) -> Result<ImportResult, Self::Error> {
1452		let hash = block.post_hash();
1453		let number = *block.header.number();
1454		let info = self.client.info();
1455
1456		self.check_inherents_and_equivocations(&mut block).await?;
1457
1458		let block_status = self
1459			.client
1460			.status(hash)
1461			.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1462
1463		// Skip babe logic if block already in chain or importing blocks during initial sync,
1464		// otherwise the check for epoch changes will error because trying to re-import an
1465		// epoch change or because of missing epoch data in the tree, respectively.
1466		if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
1467			block_status == BlockStatus::InChain
1468		{
1469			// When re-importing existing block strip away intermediates.
1470			// In case of initial sync intermediates should not be present...
1471			let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
1472			block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1473			return self.inner.import_block(block).await.map_err(Into::into)
1474		}
1475
1476		if block.with_state() {
1477			return self.import_state(block).await
1478		}
1479
1480		let pre_digest = find_pre_digest::<Block>(&block.header).expect(
1481			"valid babe headers must contain a predigest; header has been already verified; qed",
1482		);
1483		let slot = pre_digest.slot();
1484
1485		let parent_hash = *block.header.parent_hash();
1486		let parent_header = self
1487			.client
1488			.header(parent_hash)
1489			.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1490			.ok_or_else(|| {
1491				ConsensusError::ChainLookup(
1492					babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
1493				)
1494			})?;
1495
1496		let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
1497			"parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
1498			 been verified; qed",
1499		);
1500
1501		// make sure that slot number is strictly increasing
1502		if slot <= parent_slot {
1503			return Err(ConsensusError::ClientImport(
1504				babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
1505			))
1506		}
1507
1508		// if there's a pending epoch we'll save the previous epoch changes here
1509		// this way we can revert it if there's any error
1510		let mut old_epoch_changes = None;
1511
1512		// Use an extra scope to make the compiler happy, because otherwise it complains about the
1513		// mutex, even if we dropped it...
1514		let mut epoch_changes = {
1515			let mut epoch_changes = self.epoch_changes.shared_data_locked();
1516
1517			// check if there's any epoch change expected to happen at this slot.
1518			// `epoch` is the epoch to verify the block under, and `first_in_epoch` is true
1519			// if this is the first block in its chain for that epoch.
1520			//
1521			// also provides the total weight of the chain, including the imported block.
1522			let (epoch_descriptor, first_in_epoch, parent_weight) = {
1523				let parent_weight = if *parent_header.number() == Zero::zero() {
1524					0
1525				} else {
1526					aux_schema::load_block_weight(&*self.client, parent_hash)
1527						.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1528						.ok_or_else(|| {
1529							ConsensusError::ClientImport(
1530								babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
1531									.into(),
1532							)
1533						})?
1534				};
1535
1536				let intermediate =
1537					block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
1538
1539				let epoch_descriptor = intermediate.epoch_descriptor;
1540				let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
1541				(epoch_descriptor, first_in_epoch, parent_weight)
1542			};
1543
1544			let total_weight = parent_weight + pre_digest.added_weight();
1545
1546			// search for this all the time so we can reject unexpected announcements.
1547			let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
1548				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1549			let next_config_digest = find_next_config_digest::<Block>(&block.header)
1550				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1551
1552			match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
1553				(true, true, _) => {},
1554				(false, false, false) => {},
1555				(false, false, true) =>
1556					return Err(ConsensusError::ClientImport(
1557						babe_err(Error::<Block>::UnexpectedConfigChange).into(),
1558					)),
1559				(true, false, _) =>
1560					return Err(ConsensusError::ClientImport(
1561						babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
1562					)),
1563				(false, true, _) =>
1564					return Err(ConsensusError::ClientImport(
1565						babe_err(Error::<Block>::UnexpectedEpochChange).into(),
1566					)),
1567			}
1568
1569			if let Some(next_epoch_descriptor) = next_epoch_digest {
1570				old_epoch_changes = Some((*epoch_changes).clone());
1571
1572				let mut viable_epoch = epoch_changes
1573					.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
1574					.ok_or_else(|| {
1575						ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
1576					})?
1577					.into_cloned();
1578
1579				let epoch_config = next_config_digest
1580					.map(Into::into)
1581					.unwrap_or_else(|| viable_epoch.as_ref().config.clone());
1582
1583				// restrict info logging during initial sync to avoid spam
1584				let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
1585					log::Level::Debug
1586				} else {
1587					log::Level::Info
1588				};
1589
1590				if viable_epoch.as_ref().end_slot() <= slot {
1591					// Some epochs must have been skipped as our current slot fits outside the
1592					// current epoch. We will figure out which epoch it belongs to and we will
1593					// re-use the same data for that epoch.
1594					// Notice that we are only updating a local copy of the `Epoch`, this
1595					// makes it so that when we insert the next epoch into `EpochChanges` below
1596					// (after incrementing it), it will use the correct epoch index and start slot.
1597					// We do not update the original epoch that will be re-used because there might
1598					// be other forks (that we haven't imported) where the epoch isn't skipped, and
1599					// to import those forks we want to keep the original epoch data. Not updating
1600					// the original epoch works because when we search the tree for which epoch to
1601					// use for a given slot, we will search in-depth with the predicate
1602					// `epoch.start_slot <= slot` which will still match correctly without updating
1603					// `start_slot` to the correct value as below.
1604					let epoch = viable_epoch.as_mut();
1605					let prev_index = epoch.epoch_index;
1606					*epoch = epoch.clone_for_slot(slot);
1607
1608					warn!(
1609						target: LOG_TARGET,
1610						"👶 Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
1611					);
1612				}
1613
1614				log!(
1615					target: LOG_TARGET,
1616					log_level,
1617					"👶 New epoch {} launching at block {} (block slot {} >= start slot {}).",
1618					viable_epoch.as_ref().epoch_index,
1619					hash,
1620					slot,
1621					viable_epoch.as_ref().start_slot,
1622				);
1623
1624				let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
1625
1626				log!(
1627					target: LOG_TARGET,
1628					log_level,
1629					"👶 Next epoch starts at slot {}",
1630					next_epoch.as_ref().start_slot,
1631				);
1632
1633				// prune the tree of epochs not part of the finalized chain or
1634				// that are not live anymore, and then track the given epoch change
1635				// in the tree.
1636				// NOTE: it is important that these operations are done in this
1637				// order, otherwise if pruning after import the `is_descendent_of`
1638				// used by pruning may not know about the block that is being
1639				// imported.
1640				let prune_and_import = || {
1641					prune_finalized(self.client.clone(), &mut epoch_changes)?;
1642
1643					epoch_changes
1644						.import(
1645							descendent_query(&*self.client),
1646							hash,
1647							number,
1648							*block.header.parent_hash(),
1649							next_epoch,
1650						)
1651						.map_err(|e| {
1652							ConsensusError::ClientImport(format!(
1653								"Error importing epoch changes: {}",
1654								e
1655							))
1656						})?;
1657					Ok(())
1658				};
1659
1660				if let Err(e) = prune_and_import() {
1661					debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
1662					*epoch_changes =
1663						old_epoch_changes.expect("set `Some` above and not taken; qed");
1664					return Err(e)
1665				}
1666
1667				crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1668					block
1669						.auxiliary
1670						.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1671				});
1672			}
1673
1674			aux_schema::write_block_weight(hash, total_weight, |values| {
1675				block
1676					.auxiliary
1677					.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1678			});
1679
1680			// The fork choice rule is that we pick the heaviest chain (i.e.
1681			// more primary blocks), if there's a tie we go with the longest
1682			// chain.
1683			block.fork_choice = {
1684				let (last_best, last_best_number) = (info.best_hash, info.best_number);
1685
1686				let last_best_weight = if &last_best == block.header.parent_hash() {
1687					// the parent=genesis case is already covered for loading parent weight,
1688					// so we don't need to cover again here.
1689					parent_weight
1690				} else {
1691					aux_schema::load_block_weight(&*self.client, last_best)
1692						.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1693						.ok_or_else(|| {
1694							ConsensusError::ChainLookup(
1695								"No block weight for parent header.".to_string(),
1696							)
1697						})?
1698				};
1699
1700				Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
1701					true
1702				} else if total_weight == last_best_weight {
1703					number > last_best_number
1704				} else {
1705					false
1706				}))
1707			};
1708
1709			// Release the mutex, but it stays locked
1710			epoch_changes.release_mutex()
1711		};
1712
1713		let import_result = self.inner.import_block(block).await;
1714
1715		// revert to the original epoch changes in case there's an error
1716		// importing the block
1717		if import_result.is_err() {
1718			if let Some(old_epoch_changes) = old_epoch_changes {
1719				*epoch_changes.upgrade() = old_epoch_changes;
1720			}
1721		}
1722
1723		import_result.map_err(Into::into)
1724	}
1725
1726	async fn check_block(
1727		&self,
1728		block: BlockCheckParams<Block>,
1729	) -> Result<ImportResult, Self::Error> {
1730		self.inner.check_block(block).await.map_err(Into::into)
1731	}
1732}
1733
1734/// Gets the best finalized block and its slot, and prunes the given epoch tree.
1735fn prune_finalized<Block, Client>(
1736	client: Arc<Client>,
1737	epoch_changes: &mut EpochChangesFor<Block, Epoch>,
1738) -> Result<(), ConsensusError>
1739where
1740	Block: BlockT,
1741	Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
1742{
1743	let info = client.info();
1744
1745	let finalized_slot = {
1746		let finalized_header = client
1747			.header(info.finalized_hash)
1748			.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1749			.expect(
1750				"best finalized hash was given by client; finalized headers must exist in db; qed",
1751			);
1752
1753		find_pre_digest::<Block>(&finalized_header)
1754			.expect("finalized header must be valid; valid blocks have a pre-digest; qed")
1755			.slot()
1756	};
1757
1758	epoch_changes
1759		.prune_finalized(
1760			descendent_query(&*client),
1761			&info.finalized_hash,
1762			info.finalized_number,
1763			finalized_slot,
1764		)
1765		.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1766
1767	Ok(())
1768}
1769
1770/// Produce a BABE block-import object to be used later on in the construction of
1771/// an import-queue.
1772///
1773/// Also returns a link object used to correctly instantiate the import queue
1774/// and background worker.
1775pub fn block_import<Client, Block: BlockT, I, CIDP, SC>(
1776	config: BabeConfiguration,
1777	wrapped_block_import: I,
1778	client: Arc<Client>,
1779	create_inherent_data_providers: CIDP,
1780	select_chain: SC,
1781	offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1782) -> ClientResult<(BabeBlockImport<Block, Client, I, CIDP, SC>, BabeLink<Block>)>
1783where
1784	Client: AuxStore
1785		+ HeaderBackend<Block>
1786		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1787		+ PreCommitActions<Block>
1788		+ 'static,
1789{
1790	let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
1791	let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
1792
1793	// NOTE: this isn't entirely necessary, but since we didn't use to prune the
1794	// epoch tree it is useful as a migration, so that nodes prune long trees on
1795	// startup rather than waiting until importing the next epoch change block.
1796	prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
1797
1798	let client_weak = Arc::downgrade(&client);
1799	let on_finality = move |summary: &FinalityNotification<Block>| {
1800		if let Some(client) = client_weak.upgrade() {
1801			aux_storage_cleanup(client.as_ref(), summary)
1802		} else {
1803			Default::default()
1804		}
1805	};
1806	client.register_finality_action(Box::new(on_finality));
1807
1808	let import = BabeBlockImport::new(
1809		client,
1810		epoch_changes,
1811		wrapped_block_import,
1812		config,
1813		create_inherent_data_providers,
1814		select_chain,
1815		offchain_tx_pool_factory,
1816	);
1817
1818	Ok((import, link))
1819}
1820
1821/// Parameters passed to [`import_queue`].
1822pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, Spawn> {
1823	/// The BABE link that is created by [`block_import`].
1824	pub link: BabeLink<Block>,
1825	/// The block import that should be wrapped.
1826	pub block_import: BI,
1827	/// Optional justification import.
1828	pub justification_import: Option<BoxJustificationImport<Block>>,
1829	/// The client to interact with the internals of the node.
1830	pub client: Arc<Client>,
1831	/// Slot duration.
1832	pub slot_duration: SlotDuration,
1833	/// Spawner for spawning futures.
1834	pub spawner: &'a Spawn,
1835	/// Registry for prometheus metrics.
1836	pub registry: Option<&'a Registry>,
1837	/// Optional telemetry handle to report telemetry events.
1838	pub telemetry: Option<TelemetryHandle>,
1839}
1840
1841/// Start an import queue for the BABE consensus algorithm.
1842///
1843/// This method returns the import queue, some data that needs to be passed to the block authoring
1844/// logic (`BabeLink`), and a future that must be run to
1845/// completion and is responsible for listening to finality notifications and
1846/// pruning the epoch changes tree.
1847///
1848/// The block import object provided must be the `BabeBlockImport` or a wrapper
1849/// of it, otherwise crucial import logic will be omitted.
1850pub fn import_queue<Block: BlockT, Client, BI, Spawn>(
1851	ImportQueueParams {
1852		link: babe_link,
1853		block_import,
1854		justification_import,
1855		client,
1856		slot_duration,
1857		spawner,
1858		registry,
1859		telemetry,
1860	}: ImportQueueParams<'_, Block, BI, Client, Spawn>,
1861) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
1862where
1863	BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1864	Client: ProvideRuntimeApi<Block>
1865		+ HeaderBackend<Block>
1866		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1867		+ AuxStore
1868		+ Send
1869		+ Sync
1870		+ 'static,
1871	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1872	Spawn: SpawnEssentialNamed,
1873{
1874	const HANDLE_BUFFER_SIZE: usize = 1024;
1875
1876	let verifier = BabeVerifier {
1877		slot_duration,
1878		config: babe_link.config.clone(),
1879		epoch_changes: babe_link.epoch_changes.clone(),
1880		telemetry,
1881		client: client.clone(),
1882	};
1883
1884	let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
1885
1886	let answer_requests =
1887		answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
1888
1889	spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
1890
1891	Ok((
1892		BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
1893		BabeWorkerHandle(worker_tx),
1894	))
1895}
1896
1897/// Reverts protocol aux data to at most the last finalized block.
1898/// In particular, epoch-changes and block weights announced after the revert
1899/// point are removed.
1900pub fn revert<Block, Client, Backend>(
1901	client: Arc<Client>,
1902	backend: Arc<Backend>,
1903	blocks: NumberFor<Block>,
1904) -> ClientResult<()>
1905where
1906	Block: BlockT,
1907	Client: AuxStore
1908		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
1909		+ HeaderBackend<Block>
1910		+ ProvideRuntimeApi<Block>
1911		+ UsageProvider<Block>,
1912	Client::Api: BabeApi<Block>,
1913	Backend: BackendT<Block>,
1914{
1915	let best_number = client.info().best_number;
1916	let finalized = client.info().finalized_number;
1917
1918	let revertible = blocks.min(best_number - finalized);
1919	if revertible == Zero::zero() {
1920		return Ok(())
1921	}
1922
1923	let revert_up_to_number = best_number - revertible;
1924	let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
1925		format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
1926	))?;
1927
1928	// Revert epoch changes tree.
1929
1930	// This config is only used on-genesis.
1931	let config = configuration(&*client)?;
1932	let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
1933	let mut epoch_changes = epoch_changes.shared_data();
1934
1935	if revert_up_to_number == Zero::zero() {
1936		// Special case, no epoch changes data were present on genesis.
1937		*epoch_changes = EpochChangesFor::<Block, Epoch>::default();
1938	} else {
1939		epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
1940	}
1941
1942	// Remove block weights added after the revert point.
1943
1944	let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
1945
1946	let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
1947		sp_blockchain::tree_route(&*client, revert_up_to_hash, leaf)
1948			.map(|route| route.retracted().is_empty())
1949			.unwrap_or_default()
1950	});
1951
1952	for leaf in leaves {
1953		let mut hash = leaf;
1954		loop {
1955			let meta = client.header_metadata(hash)?;
1956			if meta.number <= revert_up_to_number ||
1957				!weight_keys.insert(aux_schema::block_weight_key(hash))
1958			{
1959				// We've reached the revert point or an already processed branch, stop here.
1960				break
1961			}
1962			hash = meta.parent;
1963		}
1964	}
1965
1966	let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
1967
1968	// Write epoch changes and remove weights in one shot.
1969	aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
1970		client.insert_aux(values, weight_keys.iter())
1971	})
1972}
1973
1974fn query_epoch_changes<Block, Client>(
1975	epoch_changes: &SharedEpochChanges<Block, Epoch>,
1976	client: &Client,
1977	config: &BabeConfiguration,
1978	block_number: NumberFor<Block>,
1979	slot: Slot,
1980	parent_hash: Block::Hash,
1981) -> Result<
1982	(ViableEpochDescriptor<Block::Hash, NumberFor<Block>, Epoch>, ViableEpoch<Epoch>),
1983	Error<Block>,
1984>
1985where
1986	Block: BlockT,
1987	Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
1988{
1989	let epoch_changes = epoch_changes.shared_data();
1990	let epoch_descriptor = epoch_changes
1991		.epoch_descriptor_for_child_of(
1992			descendent_query(client),
1993			&parent_hash,
1994			block_number - 1u32.into(),
1995			slot,
1996		)
1997		.map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
1998		.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
1999	let viable_epoch = epoch_changes
2000		.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
2001		.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2002	Ok((epoch_descriptor, viable_epoch.into_cloned()))
2003}