Skip to main content

soil_babe/
lib.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! # BABE (Blind Assignment for Blockchain Extension)
8//!
9//! BABE is a slot-based block production mechanism which uses a VRF PRNG to
10//! randomly perform the slot allocation. On every slot, all the authorities
11//! generate a new random number with the VRF function and if it is lower than a
12//! given threshold (which is proportional to their weight/stake) they have a
13//! right to produce a block. The proof of the VRF function execution will be
14//! used by other peer to validate the legitimacy of the slot claim.
15//!
16//! The engine is also responsible for collecting entropy on-chain which will be
17//! used to seed the given VRF PRNG. An epoch is a contiguous number of slots
18//! under which we will be using the same authority set. During an epoch all VRF
19//! outputs produced as a result of block production will be collected on an
20//! on-chain randomness pool. Epoch changes are announced one epoch in advance,
21//! i.e. when ending epoch N, we announce the parameters (randomness,
22//! authorities, etc.) for epoch N+2.
23//!
24//! Since the slot assignment is randomized, it is possible that a slot is
25//! assigned to multiple validators in which case we will have a temporary fork,
26//! or that a slot is assigned to no validator in which case no block is
27//! produced. Which means that block times are not deterministic.
28//!
29//! The protocol has a parameter `c` [0, 1] for which `1 - c` is the probability
30//! of a slot being empty. The choice of this parameter affects the security of
31//! the protocol relating to maximum tolerable network delays.
32//!
33//! In addition to the VRF-based slot assignment described above, which we will
34//! call primary slots, the engine also supports a deterministic secondary slot
35//! assignment. Primary slots take precedence over secondary slots, when
36//! authoring the node starts by trying to claim a primary slot and falls back
37//! to a secondary slot claim attempt. The secondary slot assignment is done
38//! by picking the authority at index:
39//!
40//! `blake2_256(epoch_randomness ++ slot_number) % authorities_len`.
41//!
42//! The secondary slots supports either a `SecondaryPlain` or `SecondaryVRF`
43//! variant. Comparing with `SecondaryPlain` variant, the `SecondaryVRF` variant
44//! generates an additional VRF output. The output is not included in beacon
45//! randomness, but can be consumed by parachains.
46//!
47//! The fork choice rule is weight-based, where weight equals the number of
48//! primary blocks in the chain. We will pick the heaviest chain (more primary
49//! blocks) and will go with the longest one in case of a tie.
50//!
51//! An in-depth description and analysis of the protocol can be found here:
52//! <https://research.web3.foundation/Polkadot/protocols/block-production/Babe>
53
54#![forbid(unsafe_code)]
55#![warn(missing_docs)]
56
57pub mod rpc;
58
59use std::{
60	collections::HashSet,
61	future::Future,
62	ops::{Deref, DerefMut},
63	pin::Pin,
64	sync::Arc,
65	task::{Context, Poll},
66	time::Duration,
67};
68
69use codec::{Decode, Encode};
70use futures::{
71	channel::{
72		mpsc::{channel, Receiver, Sender},
73		oneshot,
74	},
75	prelude::*,
76};
77use log::{debug, info, log, trace, warn};
78use parking_lot::Mutex;
79use soil_prometheus::Registry;
80
81use soil_client::blockchain::{
82	Backend as _, BlockStatus, Error as ClientError, HeaderBackend, HeaderMetadata,
83	Result as ClientResult,
84};
85use soil_client::client_api::{
86	backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
87	PreCommitActions, UsageProvider,
88};
89use soil_client::consensus::{
90	BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain,
91};
92use soil_client::import::{
93	BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport,
94	DefaultImportQueue, ForkChoiceStrategy, ImportResult, StateAction, Verifier,
95};
96use soil_client::transaction_pool::OffchainTransactionPoolFactory;
97use soil_consensus::epochs::{
98	descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpoch,
99	ViableEpochDescriptor,
100};
101use soil_consensus::slots::{
102	check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
103	SlotInfo, StorageChanges,
104};
105use soil_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
106use subsoil::api::{ApiExt, ProvideRuntimeApi};
107use subsoil::application_crypto::AppCrypto;
108use subsoil::block_builder::BlockBuilder as BlockBuilderApi;
109use subsoil::consensus::babe::{inherents::BabeInherentData, SlotDuration};
110use subsoil::consensus::slots::Slot;
111use subsoil::core::traits::SpawnEssentialNamed;
112use subsoil::inherents::{CreateInherentDataProviders, InherentDataProvider};
113use subsoil::keystore::KeystorePtr;
114use subsoil::runtime::{
115	generic::OpaqueDigestItemId,
116	traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
117	DigestItem,
118};
119
120pub use soil_client::consensus::SyncOracle;
121pub use soil_consensus::slots::SlotProportion;
122pub use subsoil::consensus::babe::{
123	digests::{
124		CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
125		PrimaryPreDigest, SecondaryPlainPreDigest,
126	},
127	AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
128	BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
129};
130
131pub use aux_schema::load_block_weight as block_weight;
132use subsoil::timestamp::Timestamp;
133
134mod migration;
135mod verification;
136
137pub mod authorship;
138pub mod aux_schema;
139#[cfg(test)]
140mod tests;
141
142const LOG_TARGET: &str = "babe";
143
144/// VRF context used for slots claiming lottery.
145const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"substrate-babe-vrf";
146
147/// VRF output length for slots claiming lottery.
148const AUTHORING_SCORE_LENGTH: usize = 16;
149
150/// BABE epoch information
151#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
152pub struct Epoch(subsoil::consensus::babe::Epoch);
153
154impl Deref for Epoch {
155	type Target = subsoil::consensus::babe::Epoch;
156
157	fn deref(&self) -> &Self::Target {
158		&self.0
159	}
160}
161
162impl DerefMut for Epoch {
163	fn deref_mut(&mut self) -> &mut Self::Target {
164		&mut self.0
165	}
166}
167
168impl From<subsoil::consensus::babe::Epoch> for Epoch {
169	fn from(epoch: subsoil::consensus::babe::Epoch) -> Self {
170		Epoch(epoch)
171	}
172}
173
174impl EpochT for Epoch {
175	type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
176	type Slot = Slot;
177
178	fn increment(
179		&self,
180		(descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
181	) -> Epoch {
182		subsoil::consensus::babe::Epoch {
183			epoch_index: self.epoch_index + 1,
184			start_slot: self.start_slot + self.duration,
185			duration: self.duration,
186			authorities: descriptor.authorities,
187			randomness: descriptor.randomness,
188			config,
189		}
190		.into()
191	}
192
193	fn start_slot(&self) -> Slot {
194		self.start_slot
195	}
196
197	fn end_slot(&self) -> Slot {
198		self.start_slot + self.duration
199	}
200}
201
202impl Epoch {
203	/// Create the genesis epoch (epoch #0).
204	///
205	/// This is defined to start at the slot of the first block, so that has to be provided.
206	pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
207		subsoil::consensus::babe::Epoch {
208			epoch_index: 0,
209			start_slot: slot,
210			duration: genesis_config.epoch_length,
211			authorities: genesis_config.authorities.clone(),
212			randomness: genesis_config.randomness,
213			config: BabeEpochConfiguration {
214				c: genesis_config.c,
215				allowed_slots: genesis_config.allowed_slots,
216			},
217		}
218		.into()
219	}
220
221	/// Clone and tweak epoch information to refer to the specified slot.
222	///
223	/// All the information which depends on the slot value is recomputed and assigned
224	/// to the returned epoch instance.
225	///
226	/// The `slot` must be greater than or equal the original epoch start slot,
227	/// if is less this operation is equivalent to a simple clone.
228	pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
229		let mut epoch = self.clone();
230
231		let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
232
233		let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
234			"epoch number is u64; it should be strictly smaller than number of slots; \
235				slots relate in some way to wall clock time; \
236				if u64 is not enough we should crash for safety; qed.",
237		);
238
239		let start_slot = skipped_epochs
240			.checked_mul(epoch.duration)
241			.and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
242			.expect(
243				"slot number is u64; it should relate in some way to wall clock time; \
244				 if u64 is not enough we should crash for safety; qed.",
245			);
246
247		epoch.epoch_index = epoch_index;
248		epoch.start_slot = Slot::from(start_slot);
249
250		epoch
251	}
252}
253
254/// Errors encountered by the babe authorship task.
255#[derive(Debug, thiserror::Error)]
256pub enum Error<B: BlockT> {
257	/// Multiple BABE pre-runtime digests
258	#[error("Multiple BABE pre-runtime digests, rejecting!")]
259	MultiplePreRuntimeDigests,
260	/// No BABE pre-runtime digest found
261	#[error("No BABE pre-runtime digest found")]
262	NoPreRuntimeDigest,
263	/// Multiple BABE epoch change digests
264	#[error("Multiple BABE epoch change digests, rejecting!")]
265	MultipleEpochChangeDigests,
266	/// Multiple BABE config change digests
267	#[error("Multiple BABE config change digests, rejecting!")]
268	MultipleConfigChangeDigests,
269	/// Could not extract timestamp and slot
270	#[error("Could not extract timestamp and slot: {0}")]
271	Extraction(ConsensusError),
272	/// Could not fetch epoch
273	#[error("Could not fetch epoch at {0:?}")]
274	FetchEpoch(B::Hash),
275	/// Header rejected: too far in the future
276	#[error("Header {0:?} rejected: too far in the future")]
277	TooFarInFuture(B::Hash),
278	/// Parent unavailable. Cannot import
279	#[error("Parent ({0}) of {1} unavailable. Cannot import")]
280	ParentUnavailable(B::Hash, B::Hash),
281	/// Slot number must increase
282	#[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
283	SlotMustIncrease(Slot, Slot),
284	/// Header has a bad seal
285	#[error("Header {0:?} has a bad seal")]
286	HeaderBadSeal(B::Hash),
287	/// Header is unsealed
288	#[error("Header {0:?} is unsealed")]
289	HeaderUnsealed(B::Hash),
290	/// Slot author not found
291	#[error("Slot author not found")]
292	SlotAuthorNotFound,
293	/// Secondary slot assignments are disabled for the current epoch.
294	#[error("Secondary slot assignments are disabled for the current epoch.")]
295	SecondarySlotAssignmentsDisabled,
296	/// Bad signature
297	#[error("Bad signature on {0:?}")]
298	BadSignature(B::Hash),
299	/// Invalid author: Expected secondary author
300	#[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
301	InvalidAuthor(AuthorityId, AuthorityId),
302	/// No secondary author expected.
303	#[error("No secondary author expected.")]
304	NoSecondaryAuthorExpected,
305	/// VRF verification failed
306	#[error("VRF verification failed")]
307	VrfVerificationFailed,
308	/// Primary slot threshold too low
309	#[error("VRF output rejected, threshold {0} exceeded")]
310	VrfThresholdExceeded(u128),
311	/// Could not fetch parent header
312	#[error("Could not fetch parent header: {0}")]
313	FetchParentHeader(soil_client::blockchain::Error),
314	/// Expected epoch change to happen.
315	#[error("Expected epoch change to happen at {0:?}, s{1}")]
316	ExpectedEpochChange(B::Hash, Slot),
317	/// Unexpected config change.
318	#[error("Unexpected config change")]
319	UnexpectedConfigChange,
320	/// Unexpected epoch change
321	#[error("Unexpected epoch change")]
322	UnexpectedEpochChange,
323	/// Parent block has no associated weight
324	#[error("Parent block of {0} has no associated weight")]
325	ParentBlockNoAssociatedWeight(B::Hash),
326	/// Check inherents error
327	#[error("Checking inherents failed: {0}")]
328	CheckInherents(subsoil::inherents::Error),
329	/// Unhandled check inherents error
330	#[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
331	CheckInherentsUnhandled(subsoil::inherents::InherentIdentifier),
332	/// Create inherents error.
333	#[error("Creating inherents failed: {0}")]
334	CreateInherents(subsoil::inherents::Error),
335	/// Background worker is not running and therefore requests cannot be answered.
336	#[error("Background worker is not running")]
337	BackgroundWorkerTerminated,
338	/// Client error
339	#[error(transparent)]
340	Client(soil_client::blockchain::Error),
341	/// Runtime Api error.
342	#[error(transparent)]
343	RuntimeApi(subsoil::api::ApiError),
344	/// Fork tree error
345	#[error(transparent)]
346	ForkTree(Box<soil_fork_tree::Error<soil_client::blockchain::Error>>),
347}
348
349impl<B: BlockT> From<Error<B>> for String {
350	fn from(error: Error<B>) -> String {
351		error.to_string()
352	}
353}
354
355fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
356	debug!(target: LOG_TARGET, "{}", error);
357	error
358}
359
360/// Intermediate value passed to block importer.
361pub struct BabeIntermediate<B: BlockT> {
362	/// The epoch descriptor.
363	pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
364}
365
366/// Intermediate key for Babe engine.
367pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
368
369/// Read configuration from the runtime state at current best block.
370pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
371where
372	C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
373	C::Api: BabeApi<B>,
374{
375	let at_hash = if client.usage_info().chain.finalized_state.is_some() {
376		client.usage_info().chain.best_hash
377	} else {
378		debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
379		client.usage_info().chain.genesis_hash
380	};
381
382	let runtime_api = client.runtime_api();
383	let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
384
385	let config = match version {
386		Some(1) => {
387			#[allow(deprecated)]
388			{
389				runtime_api.configuration_before_version_2(at_hash)?.into()
390			}
391		},
392		Some(2) => runtime_api.configuration(at_hash)?,
393		_ => {
394			return Err(soil_client::blockchain::Error::VersionInvalid(
395				"Unsupported or invalid BabeApi version".to_string(),
396			))
397		},
398	};
399	Ok(config)
400}
401
402/// Parameters for BABE.
403pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
404	/// The keystore that manages the keys of the node.
405	pub keystore: KeystorePtr,
406
407	/// The client to use
408	pub client: Arc<C>,
409
410	/// The SelectChain Strategy
411	pub select_chain: SC,
412
413	/// The environment we are producing blocks for.
414	pub env: E,
415
416	/// The underlying block-import object to supply our produced blocks to.
417	/// This must be a `BabeBlockImport` or a wrapper of it, otherwise
418	/// critical consensus logic will be omitted.
419	pub block_import: I,
420
421	/// A sync oracle
422	pub sync_oracle: SO,
423
424	/// Hook into the sync module to control the justification sync process.
425	pub justification_sync_link: L,
426
427	/// Something that can create the inherent data providers.
428	pub create_inherent_data_providers: CIDP,
429
430	/// Force authoring of blocks even if we are offline
431	pub force_authoring: bool,
432
433	/// Strategy and parameters for backing off block production.
434	pub backoff_authoring_blocks: Option<BS>,
435
436	/// The source of timestamps for relative slots
437	pub babe_link: BabeLink<B>,
438
439	/// The proportion of the slot dedicated to proposing.
440	///
441	/// The block proposing will be limited to this proportion of the slot from the starting of the
442	/// slot. However, the proposing can still take longer when there is some lenience factor
443	/// applied, because there were no blocks produced for some slots.
444	pub block_proposal_slot_portion: SlotProportion,
445
446	/// The maximum proportion of the slot dedicated to proposing with any lenience factor applied
447	/// due to no blocks being produced.
448	pub max_block_proposal_slot_portion: Option<SlotProportion>,
449
450	/// Handle use to report telemetries.
451	pub telemetry: Option<TelemetryHandle>,
452}
453
454/// Start the babe worker.
455pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
456	BabeParams {
457		keystore,
458		client,
459		select_chain,
460		env,
461		block_import,
462		sync_oracle,
463		justification_sync_link,
464		create_inherent_data_providers,
465		force_authoring,
466		backoff_authoring_blocks,
467		babe_link,
468		block_proposal_slot_portion,
469		max_block_proposal_slot_portion,
470		telemetry,
471	}: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
472) -> Result<BabeWorker<B>, ConsensusError>
473where
474	B: BlockT,
475	C: ProvideRuntimeApi<B>
476		+ HeaderBackend<B>
477		+ HeaderMetadata<B, Error = ClientError>
478		+ Send
479		+ Sync
480		+ 'static,
481	C::Api: BabeApi<B>,
482	SC: SelectChain<B> + 'static,
483	E: Environment<B, Error = Error> + Send + Sync + 'static,
484	E::Proposer: Proposer<B, Error = Error>,
485	I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
486	SO: SyncOracle + Send + Sync + Clone + 'static,
487	L: soil_client::import::JustificationSyncLink<B> + 'static,
488	CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
489	CIDP::InherentDataProviders: InherentDataProviderExt + Send,
490	BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
491	Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
492{
493	let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
494
495	let worker = BabeSlotWorker {
496		client: client.clone(),
497		block_import,
498		env,
499		sync_oracle: sync_oracle.clone(),
500		justification_sync_link,
501		force_authoring,
502		backoff_authoring_blocks,
503		keystore,
504		epoch_changes: babe_link.epoch_changes.clone(),
505		slot_notification_sinks: slot_notification_sinks.clone(),
506		config: babe_link.config.clone(),
507		block_proposal_slot_portion,
508		max_block_proposal_slot_portion,
509		telemetry,
510	};
511
512	info!(target: LOG_TARGET, "👶 Starting BABE Authorship worker");
513
514	let slot_worker = soil_consensus::slots::start_slot_worker(
515		babe_link.config.slot_duration(),
516		select_chain,
517		soil_consensus::slots::SimpleSlotWorkerToSlotWorker(worker),
518		sync_oracle,
519		create_inherent_data_providers,
520	);
521
522	Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
523}
524
525// Remove obsolete block's weight data by leveraging finality notifications.
526// This includes data for all finalized blocks (excluding the most recent one)
527// and all stale branches.
528fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
529	client: &C,
530	notification: &FinalityNotification<Block>,
531) -> AuxDataOperations {
532	let mut hashes = HashSet::new();
533
534	let first = notification.tree_route.first().unwrap_or(&notification.hash);
535	match client.header_metadata(*first) {
536		Ok(meta) => {
537			hashes.insert(meta.parent);
538		},
539		Err(err) => {
540			warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
541		},
542	}
543
544	// Cleans data for finalized block's ancestors
545	hashes.extend(
546		notification
547			.tree_route
548			.iter()
549			// Ensure we don't prune latest finalized block.
550			// This should not happen, but better be safe than sorry!
551			.filter(|h| **h != notification.hash),
552	);
553
554	hashes.extend(notification.stale_blocks.iter().map(|b| b.hash));
555
556	hashes
557		.into_iter()
558		.map(|val| (aux_schema::block_weight_key(val), None))
559		.collect()
560}
561
562async fn answer_requests<B: BlockT, C>(
563	mut request_rx: Receiver<BabeRequest<B>>,
564	config: BabeConfiguration,
565	client: Arc<C>,
566	epoch_changes: SharedEpochChanges<B, Epoch>,
567) where
568	C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
569{
570	while let Some(request) = request_rx.next().await {
571		match request {
572			BabeRequest::EpochData(response) => {
573				let _ = response.send(epoch_changes.shared_data().clone());
574			},
575			BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
576				let lookup = || {
577					let epoch_changes = epoch_changes.shared_data();
578					epoch_changes
579						.epoch_data_for_child_of(
580							descendent_query(&*client),
581							&parent_hash,
582							parent_number,
583							slot,
584							|slot| Epoch::genesis(&config, slot),
585						)
586						.map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
587						.ok_or(Error::<B>::FetchEpoch(parent_hash))
588				};
589
590				let _ = response.send(lookup());
591			},
592		}
593	}
594}
595
596/// Requests to the BABE service.
597enum BabeRequest<B: BlockT> {
598	/// Request all available epoch data.
599	EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
600	/// Request the epoch that a child of the given block, with the given slot number would have.
601	///
602	/// The parent block is identified by its hash and number.
603	EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
604}
605
606/// A handle to the BABE worker for issuing requests.
607#[derive(Clone)]
608pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
609
610impl<B: BlockT> BabeWorkerHandle<B> {
611	async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
612		match self.0.clone().send(request).await {
613			Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
614			Err(err) => warn!(
615				target: LOG_TARGET,
616				"Unhandled error when sending request to worker: {:?}", err
617			),
618			_ => {},
619		}
620
621		Ok(())
622	}
623
624	/// Fetch all available epoch data.
625	pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
626		let (tx, rx) = oneshot::channel();
627		self.send_request(BabeRequest::EpochData(tx)).await?;
628
629		rx.await.or(Err(Error::BackgroundWorkerTerminated))
630	}
631
632	/// Fetch the epoch that a child of the given block, with the given slot number would have.
633	///
634	/// The parent block is identified by its hash and number.
635	pub async fn epoch_data_for_child_of(
636		&self,
637		parent_hash: B::Hash,
638		parent_number: NumberFor<B>,
639		slot: Slot,
640	) -> Result<Epoch, Error<B>> {
641		let (tx, rx) = oneshot::channel();
642		self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
643			.await?;
644
645		rx.await.or(Err(Error::BackgroundWorkerTerminated))?
646	}
647}
648
649/// Worker for Babe which implements `Future<Output=()>`. This must be polled.
650#[must_use]
651pub struct BabeWorker<B: BlockT> {
652	inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
653	slot_notification_sinks: SlotNotificationSinks<B>,
654}
655
656impl<B: BlockT> BabeWorker<B> {
657	/// Return an event stream of notifications for when new slot happens, and the corresponding
658	/// epoch descriptor.
659	pub fn slot_notification_stream(
660		&self,
661	) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
662		const CHANNEL_BUFFER_SIZE: usize = 1024;
663
664		let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
665		self.slot_notification_sinks.lock().push(sink);
666		stream
667	}
668}
669
670impl<B: BlockT> Future for BabeWorker<B> {
671	type Output = ();
672
673	fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
674		self.inner.as_mut().poll(cx)
675	}
676}
677
678/// Slot notification sinks.
679type SlotNotificationSinks<B> = Arc<
680	Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
681>;
682
683struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
684	client: Arc<C>,
685	block_import: I,
686	env: E,
687	sync_oracle: SO,
688	justification_sync_link: L,
689	force_authoring: bool,
690	backoff_authoring_blocks: Option<BS>,
691	keystore: KeystorePtr,
692	epoch_changes: SharedEpochChanges<B, Epoch>,
693	slot_notification_sinks: SlotNotificationSinks<B>,
694	config: BabeConfiguration,
695	block_proposal_slot_portion: SlotProportion,
696	max_block_proposal_slot_portion: Option<SlotProportion>,
697	telemetry: Option<TelemetryHandle>,
698}
699
700#[async_trait::async_trait]
701impl<B, C, E, I, Error, SO, L, BS> soil_consensus::slots::SimpleSlotWorker<B>
702	for BabeSlotWorker<B, C, E, I, SO, L, BS>
703where
704	B: BlockT,
705	C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
706	C::Api: BabeApi<B>,
707	E: Environment<B, Error = Error> + Send + Sync,
708	E::Proposer: Proposer<B, Error = Error>,
709	I: BlockImport<B> + Send + Sync + 'static,
710	SO: SyncOracle + Send + Clone + Sync,
711	L: soil_client::import::JustificationSyncLink<B>,
712	BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
713	Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
714{
715	type Claim = (PreDigest, AuthorityId);
716	type SyncOracle = SO;
717	type JustificationSyncLink = L;
718	type CreateProposer =
719		Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
720	type Proposer = E::Proposer;
721	type BlockImport = I;
722	type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
723
724	fn logging_target(&self) -> &'static str {
725		LOG_TARGET
726	}
727
728	fn block_import(&mut self) -> &mut Self::BlockImport {
729		&mut self.block_import
730	}
731
732	fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
733		self.epoch_changes
734			.shared_data()
735			.epoch_descriptor_for_child_of(
736				descendent_query(&*self.client),
737				&parent.hash(),
738				*parent.number(),
739				slot,
740			)
741			.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
742			.ok_or(ConsensusError::InvalidAuthoritiesSet)
743	}
744
745	fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
746		self.epoch_changes
747			.shared_data()
748			.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
749			.map(|epoch| epoch.as_ref().authorities.len())
750	}
751
752	async fn claim_slot(
753		&mut self,
754		_parent_header: &B::Header,
755		slot: Slot,
756		epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
757	) -> Option<Self::Claim> {
758		debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
759		let s = authorship::claim_slot(
760			slot,
761			self.epoch_changes
762				.shared_data()
763				.viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
764				.as_ref(),
765			&self.keystore,
766		);
767
768		if s.is_some() {
769			debug!(target: LOG_TARGET, "Claimed slot {}", slot);
770		}
771
772		s
773	}
774
775	fn notify_slot(
776		&self,
777		_parent_header: &B::Header,
778		slot: Slot,
779		epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
780	) {
781		let sinks = &mut self.slot_notification_sinks.lock();
782		sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
783			Ok(()) => true,
784			Err(e) => {
785				if e.is_full() {
786					warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
787					true
788				} else {
789					false
790				}
791			},
792		});
793	}
794
795	fn pre_digest_data(
796		&self,
797		_slot: Slot,
798		claim: &Self::Claim,
799	) -> Vec<subsoil::runtime::DigestItem> {
800		vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
801	}
802
803	async fn block_import_params(
804		&self,
805		header: B::Header,
806		header_hash: &B::Hash,
807		body: Vec<B::Extrinsic>,
808		storage_changes: StorageChanges<B>,
809		(_, public): Self::Claim,
810		epoch_descriptor: Self::AuxData,
811	) -> Result<BlockImportParams<B>, ConsensusError> {
812		let signature = self
813			.keystore
814			.sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
815			.map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
816			.ok_or_else(|| {
817				ConsensusError::CannotSign(format!(
818					"Could not find key in keystore. Key: {:?}",
819					public
820				))
821			})?;
822
823		let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
824
825		let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
826		import_block.post_digests.push(digest_item);
827		import_block.body = Some(body);
828		import_block.state_action = StateAction::ApplyChanges(
829			soil_client::import::StorageChanges::Changes(storage_changes),
830		);
831		import_block
832			.insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
833
834		Ok(import_block)
835	}
836
837	fn force_authoring(&self) -> bool {
838		self.force_authoring
839	}
840
841	fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
842		if let Some(ref strategy) = self.backoff_authoring_blocks {
843			if let Ok(chain_head_slot) =
844				find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
845			{
846				return strategy.should_backoff(
847					*chain_head.number(),
848					chain_head_slot,
849					self.client.info().finalized_number,
850					slot,
851					self.logging_target(),
852				);
853			}
854		}
855		false
856	}
857
858	fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
859		&mut self.sync_oracle
860	}
861
862	fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
863		&mut self.justification_sync_link
864	}
865
866	fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
867		Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
868	}
869
870	fn telemetry(&self) -> Option<TelemetryHandle> {
871		self.telemetry.clone()
872	}
873
874	fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
875		let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
876
877		soil_consensus::slots::proposing_remaining_duration(
878			parent_slot,
879			slot_info,
880			&self.block_proposal_slot_portion,
881			self.max_block_proposal_slot_portion.as_ref(),
882			soil_consensus::slots::SlotLenienceType::Exponential,
883			self.logging_target(),
884		)
885	}
886}
887
888/// Extract the BABE pre digest from the given header. Pre-runtime digests are
889/// mandatory, the function will return `Err` if none is found.
890pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
891	// genesis block doesn't contain a pre digest so let's generate a
892	// dummy one to not break any invariants in the rest of the code
893	if header.number().is_zero() {
894		return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
895			slot: 0.into(),
896			authority_index: 0,
897		}));
898	}
899
900	let mut pre_digest: Option<_> = None;
901	for log in header.digest().logs() {
902		trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
903		match (log.as_babe_pre_digest(), pre_digest.is_some()) {
904			(Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
905			(None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
906			(s, false) => pre_digest = s,
907		}
908	}
909	pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
910}
911
912/// Check whether the given header contains a BABE epoch change digest.
913pub fn contains_epoch_change<B: BlockT>(header: &B::Header) -> bool {
914	find_next_epoch_digest::<B>(header).ok().flatten().is_some()
915}
916
917/// Extract the BABE epoch change digest from the given header, if it exists.
918pub fn find_next_epoch_digest<B: BlockT>(
919	header: &B::Header,
920) -> Result<Option<NextEpochDescriptor>, Error<B>> {
921	let mut epoch_digest: Option<_> = None;
922	for log in header.digest().logs() {
923		trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
924		let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
925		match (log, epoch_digest.is_some()) {
926			(Some(ConsensusLog::NextEpochData(_)), true) => {
927				return Err(babe_err(Error::MultipleEpochChangeDigests))
928			},
929			(Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
930			_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
931		}
932	}
933
934	Ok(epoch_digest)
935}
936
937/// Extract the BABE config change digest from the given header, if it exists.
938fn find_next_config_digest<B: BlockT>(
939	header: &B::Header,
940) -> Result<Option<NextConfigDescriptor>, Error<B>> {
941	let mut config_digest: Option<_> = None;
942	for log in header.digest().logs() {
943		trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
944		let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
945		match (log, config_digest.is_some()) {
946			(Some(ConsensusLog::NextConfigData(_)), true) => {
947				return Err(babe_err(Error::MultipleConfigChangeDigests))
948			},
949			(Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
950			_ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
951		}
952	}
953
954	Ok(config_digest)
955}
956
957/// State that must be shared between the import queue and the authoring logic.
958#[derive(Clone)]
959pub struct BabeLink<Block: BlockT> {
960	epoch_changes: SharedEpochChanges<Block, Epoch>,
961	config: BabeConfiguration,
962}
963
964impl<Block: BlockT> BabeLink<Block> {
965	/// Get the epoch changes of this link.
966	pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
967		&self.epoch_changes
968	}
969
970	/// Get the config of this link.
971	pub fn config(&self) -> &BabeConfiguration {
972		&self.config
973	}
974}
975
976/// A verifier for Babe blocks.
977pub struct BabeVerifier<Block: BlockT, Client> {
978	client: Arc<Client>,
979	slot_duration: SlotDuration,
980	config: BabeConfiguration,
981	epoch_changes: SharedEpochChanges<Block, Epoch>,
982	telemetry: Option<TelemetryHandle>,
983}
984
985#[async_trait::async_trait]
986impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client>
987where
988	Block: BlockT,
989	Client: HeaderMetadata<Block, Error = soil_client::blockchain::Error>
990		+ HeaderBackend<Block>
991		+ ProvideRuntimeApi<Block>
992		+ Send
993		+ Sync
994		+ AuxStore,
995	Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
996{
997	async fn verify(
998		&self,
999		mut block: BlockImportParams<Block>,
1000	) -> Result<BlockImportParams<Block>, String> {
1001		trace!(
1002			target: LOG_TARGET,
1003			"Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
1004			block.origin,
1005			block.header,
1006			block.justifications,
1007			block.body,
1008		);
1009
1010		let hash = block.header.hash();
1011		let parent_hash = *block.header.parent_hash();
1012
1013		let number = block.header.number();
1014
1015		if should_skip_verification(&*self.client, &block) {
1016			return Ok(block);
1017		}
1018
1019		debug!(
1020			target: LOG_TARGET,
1021			"We have {:?} logs in this header",
1022			block.header.digest().logs().len()
1023		);
1024
1025		let slot_now = Slot::from_timestamp(Timestamp::current(), self.slot_duration);
1026
1027		let pre_digest = find_pre_digest::<Block>(&block.header)?;
1028		let (check_header, epoch_descriptor) = {
1029			let (epoch_descriptor, viable_epoch) = query_epoch_changes(
1030				&self.epoch_changes,
1031				self.client.as_ref(),
1032				&self.config,
1033				*number,
1034				pre_digest.slot(),
1035				parent_hash,
1036			)?;
1037
1038			// We add one to the current slot to allow for some small drift.
1039			// FIXME #1019 in the future, alter this queue to allow deferring of headers
1040			let v_params = verification::VerificationParams {
1041				header: block.header.clone(),
1042				pre_digest: Some(pre_digest),
1043				slot_now: slot_now + 1,
1044				epoch: viable_epoch.as_ref(),
1045			};
1046
1047			(verification::check_header::<Block>(v_params)?, epoch_descriptor)
1048		};
1049
1050		match check_header {
1051			CheckedHeader::Checked(pre_header, verified_info) => {
1052				trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
1053				telemetry!(
1054					self.telemetry;
1055					CONSENSUS_TRACE;
1056					"babe.checked_and_importing";
1057					"pre_header" => ?pre_header,
1058				);
1059
1060				block.header = pre_header;
1061				block.post_digests.push(verified_info.seal);
1062				block.insert_intermediate(
1063					INTERMEDIATE_KEY,
1064					BabeIntermediate::<Block> { epoch_descriptor },
1065				);
1066				block.post_hash = Some(hash);
1067
1068				Ok(block)
1069			},
1070			CheckedHeader::Deferred(a, b) => {
1071				debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
1072				telemetry!(
1073					self.telemetry;
1074					CONSENSUS_DEBUG;
1075					"babe.header_too_far_in_future";
1076					"hash" => ?hash, "a" => ?a, "b" => ?b
1077				);
1078				Err(Error::<Block>::TooFarInFuture(hash).into())
1079			},
1080		}
1081	}
1082}
1083
1084/// Verification for imported blocks is skipped in three cases:
1085/// 1. When importing blocks below the last finalized block during network initial synchronization.
1086/// 2. When importing whole state we don't calculate epoch descriptor, but rather read it from the
1087///    state after import. We also skip all verifications because there's no parent state and we
1088///    trust the sync module to verify that the state is correct and finalized.
1089/// 3. When importing warp sync blocks that have already been verified via warp sync proof.
1090fn should_skip_verification<B: BlockT>(
1091	client: &impl HeaderBackend<B>,
1092	block: &BlockImportParams<B>,
1093) -> bool {
1094	block.origin == BlockOrigin::WarpSync || block.with_state() || {
1095		let number = *block.header.number();
1096		let info = client.info();
1097		info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end)
1098	}
1099}
1100
1101/// A block-import handler for BABE.
1102///
1103/// This scans each imported block for epoch change signals. The signals are
1104/// tracked in a tree (of all forks), and the import logic validates all epoch
1105/// change transitions, i.e. whether a given epoch change is expected or whether
1106/// it is missing.
1107///
1108/// The epoch change tree should be pruned as blocks are finalized.
1109pub struct BabeBlockImport<Block: BlockT, Client, I, CIDP, SC> {
1110	inner: I,
1111	client: Arc<Client>,
1112	epoch_changes: SharedEpochChanges<Block, Epoch>,
1113	create_inherent_data_providers: CIDP,
1114	config: BabeConfiguration,
1115	// A [`SelectChain`] implementation.
1116	//
1117	// Used to determine the best block that should be used as basis when sending an equivocation
1118	// report.
1119	select_chain: SC,
1120	// The offchain transaction pool factory.
1121	//
1122	// Will be used when sending equivocation reports.
1123	offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1124}
1125
1126impl<Block: BlockT, I: Clone, Client, CIDP: Clone, SC: Clone> Clone
1127	for BabeBlockImport<Block, Client, I, CIDP, SC>
1128{
1129	fn clone(&self) -> Self {
1130		BabeBlockImport {
1131			inner: self.inner.clone(),
1132			client: self.client.clone(),
1133			epoch_changes: self.epoch_changes.clone(),
1134			config: self.config.clone(),
1135			create_inherent_data_providers: self.create_inherent_data_providers.clone(),
1136			select_chain: self.select_chain.clone(),
1137			offchain_tx_pool_factory: self.offchain_tx_pool_factory.clone(),
1138		}
1139	}
1140}
1141
1142impl<Block: BlockT, Client, I, CIDP, SC> BabeBlockImport<Block, Client, I, CIDP, SC> {
1143	fn new(
1144		client: Arc<Client>,
1145		epoch_changes: SharedEpochChanges<Block, Epoch>,
1146		block_import: I,
1147		config: BabeConfiguration,
1148		create_inherent_data_providers: CIDP,
1149		select_chain: SC,
1150		offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1151	) -> Self {
1152		BabeBlockImport {
1153			client,
1154			inner: block_import,
1155			epoch_changes,
1156			config,
1157			create_inherent_data_providers,
1158			select_chain,
1159			offchain_tx_pool_factory,
1160		}
1161	}
1162}
1163
1164impl<Block, Client, Inner, CIDP, SC> BabeBlockImport<Block, Client, Inner, CIDP, SC>
1165where
1166	Block: BlockT,
1167	Inner: BlockImport<Block> + Send + Sync,
1168	Inner::Error: Into<ConsensusError>,
1169	Client: HeaderBackend<Block>
1170		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1171		+ AuxStore
1172		+ ProvideRuntimeApi<Block>
1173		+ Send
1174		+ Sync,
1175	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1176	CIDP: CreateInherentDataProviders<Block, ()>,
1177	CIDP::InherentDataProviders: InherentDataProviderExt + Send,
1178	SC: soil_client::consensus::SelectChain<Block> + 'static,
1179{
1180	/// Import whole state after warp sync.
1181	// This function makes multiple transactions to the DB. If one of them fails we may
1182	// end up in an inconsistent state and have to resync.
1183	async fn import_state(
1184		&self,
1185		mut block: BlockImportParams<Block>,
1186	) -> Result<ImportResult, ConsensusError> {
1187		let hash = block.post_hash();
1188		let parent_hash = *block.header.parent_hash();
1189		let number = *block.header.number();
1190
1191		block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
1192		// Reset block weight.
1193		aux_schema::write_block_weight(hash, 0, |values| {
1194			block
1195				.auxiliary
1196				.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1197		});
1198
1199		// First make the client import the state.
1200		let import_result = self.inner.import_block(block).await;
1201		let aux = match import_result {
1202			Ok(ImportResult::Imported(aux)) => aux,
1203			Ok(r) => {
1204				return Err(ConsensusError::ClientImport(format!(
1205					"Unexpected import result: {:?}",
1206					r
1207				)))
1208			},
1209			Err(r) => return Err(r.into()),
1210		};
1211
1212		// Read epoch info from the imported state.
1213		let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
1214			ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1215		})?;
1216		let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
1217			ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1218		})?;
1219
1220		let mut epoch_changes = self.epoch_changes.shared_data_locked();
1221		epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
1222		aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1223			self.client.insert_aux(insert, [])
1224		})
1225		.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1226
1227		Ok(ImportResult::Imported(aux))
1228	}
1229
1230	/// Check the inherents and equivocations.
1231	async fn check_inherents_and_equivocations(
1232		&self,
1233		block: &mut BlockImportParams<Block>,
1234	) -> Result<(), ConsensusError> {
1235		if should_skip_verification(&*self.client, block) {
1236			return Ok(());
1237		}
1238
1239		let parent_hash = *block.header.parent_hash();
1240		let number = *block.header.number();
1241
1242		let create_inherent_data_providers = self
1243			.create_inherent_data_providers
1244			.create_inherent_data_providers(parent_hash, ())
1245			.await?;
1246
1247		let slot_now = create_inherent_data_providers.slot();
1248
1249		let babe_pre_digest = find_pre_digest::<Block>(&block.header)
1250			.map_err(|e| ConsensusError::Other(Box::new(e)))?;
1251		let slot = babe_pre_digest.slot();
1252
1253		// Check inherents.
1254		self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
1255			.await?;
1256
1257		// Check for equivocation and report it to the runtime if needed.
1258		let author = {
1259			let viable_epoch = query_epoch_changes(
1260				&self.epoch_changes,
1261				self.client.as_ref(),
1262				&self.config,
1263				number,
1264				slot,
1265				parent_hash,
1266			)
1267			.map_err(|e| ConsensusError::Other(babe_err(e).into()))?
1268			.1;
1269			match viable_epoch
1270				.as_ref()
1271				.authorities
1272				.get(babe_pre_digest.authority_index() as usize)
1273			{
1274				Some(author) => author.0.clone(),
1275				None => {
1276					return Err(ConsensusError::Other(Error::<Block>::SlotAuthorNotFound.into()))
1277				},
1278			}
1279		};
1280		if let Err(err) = self
1281			.check_and_report_equivocation(slot_now, slot, &block.header, &author, &block.origin)
1282			.await
1283		{
1284			warn!(
1285				target: LOG_TARGET,
1286				"Error checking/reporting BABE equivocation: {}", err
1287			);
1288		}
1289		Ok(())
1290	}
1291
1292	async fn check_inherents(
1293		&self,
1294		block: &mut BlockImportParams<Block>,
1295		at_hash: Block::Hash,
1296		slot: Slot,
1297		create_inherent_data_providers: CIDP::InherentDataProviders,
1298	) -> Result<(), ConsensusError> {
1299		if block.state_action.skip_execution_checks() {
1300			return Ok(());
1301		}
1302
1303		if let Some(inner_body) = block.body.take() {
1304			let new_block = Block::new(block.header.clone(), inner_body);
1305			// if the body is passed through and the block was executed,
1306			// we need to use the runtime to check that the internally-set
1307			// timestamp in the inherents actually matches the slot set in the seal.
1308			let mut inherent_data = create_inherent_data_providers
1309				.create_inherent_data()
1310				.await
1311				.map_err(|e| ConsensusError::Other(Box::new(e)))?;
1312			inherent_data.babe_replace_inherent_data(slot);
1313
1314			use subsoil::block_builder::CheckInherentsError;
1315
1316			subsoil::block_builder::check_inherents_with_data(
1317				self.client.clone(),
1318				at_hash,
1319				new_block.clone(),
1320				&create_inherent_data_providers,
1321				inherent_data,
1322			)
1323			.await
1324			.map_err(|e| {
1325				ConsensusError::Other(Box::new(match e {
1326					CheckInherentsError::CreateInherentData(e) => {
1327						Error::<Block>::CreateInherents(e)
1328					},
1329					CheckInherentsError::Client(e) => Error::RuntimeApi(e),
1330					CheckInherentsError::CheckInherents(e) => Error::CheckInherents(e),
1331					CheckInherentsError::CheckInherentsUnknownError(id) => {
1332						Error::CheckInherentsUnhandled(id)
1333					},
1334				}))
1335			})?;
1336			let (_, inner_body) = new_block.deconstruct();
1337			block.body = Some(inner_body);
1338		}
1339
1340		Ok(())
1341	}
1342
1343	async fn check_and_report_equivocation(
1344		&self,
1345		slot_now: Slot,
1346		slot: Slot,
1347		header: &Block::Header,
1348		author: &AuthorityId,
1349		origin: &BlockOrigin,
1350	) -> Result<(), Error<Block>> {
1351		// don't report any equivocations during initial sync
1352		// as they are most likely stale.
1353		if *origin == BlockOrigin::NetworkInitialSync {
1354			return Ok(());
1355		}
1356
1357		// check if authorship of this header is an equivocation and return a proof if so.
1358		let Some(equivocation_proof) =
1359			check_equivocation(&*self.client, slot_now, slot, header, author)
1360				.map_err(Error::Client)?
1361		else {
1362			return Ok(());
1363		};
1364
1365		info!(
1366			target: LOG_TARGET,
1367			"Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
1368			author,
1369			slot,
1370			equivocation_proof.first_header.hash(),
1371			equivocation_proof.second_header.hash(),
1372		);
1373
1374		// get the best block on which we will build and send the equivocation report.
1375		let best_hash = self
1376			.select_chain
1377			.best_chain()
1378			.await
1379			.map(|h| h.hash())
1380			.map_err(|e| Error::Client(e.into()))?;
1381
1382		// generate a key ownership proof. we start by trying to generate the
1383		// key ownership proof at the parent of the equivocating header, this
1384		// will make sure that proof generation is successful since it happens
1385		// during the on-going session (i.e. session keys are available in the
1386		// state to be able to generate the proof). this might fail if the
1387		// equivocation happens on the first block of the session, in which case
1388		// its parent would be on the previous session. if generation on the
1389		// parent header fails we try with best block as well.
1390		let generate_key_owner_proof = |at_hash: Block::Hash| {
1391			self.client
1392				.runtime_api()
1393				.generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
1394				.map_err(Error::RuntimeApi)
1395		};
1396
1397		let parent_hash = *header.parent_hash();
1398		let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
1399			Some(proof) => proof,
1400			None => match generate_key_owner_proof(best_hash)? {
1401				Some(proof) => proof,
1402				None => {
1403					debug!(
1404						target: LOG_TARGET,
1405						"Equivocation offender is not part of the authority set."
1406					);
1407					return Ok(());
1408				},
1409			},
1410		};
1411
1412		// submit equivocation report at best block.
1413		let mut runtime_api = self.client.runtime_api();
1414
1415		// Register the offchain tx pool to be able to use it from the runtime.
1416		runtime_api
1417			.register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
1418
1419		runtime_api
1420			.submit_report_equivocation_unsigned_extrinsic(
1421				best_hash,
1422				equivocation_proof,
1423				key_owner_proof,
1424			)
1425			.map_err(Error::RuntimeApi)?;
1426
1427		info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
1428
1429		Ok(())
1430	}
1431}
1432
1433#[async_trait::async_trait]
1434impl<Block, Client, Inner, CIDP, SC> BlockImport<Block>
1435	for BabeBlockImport<Block, Client, Inner, CIDP, SC>
1436where
1437	Block: BlockT,
1438	Inner: BlockImport<Block> + Send + Sync,
1439	Inner::Error: Into<ConsensusError>,
1440	Client: HeaderBackend<Block>
1441		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1442		+ AuxStore
1443		+ ProvideRuntimeApi<Block>
1444		+ Send
1445		+ Sync,
1446	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1447	CIDP: CreateInherentDataProviders<Block, ()>,
1448	CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
1449	SC: SelectChain<Block> + 'static,
1450{
1451	type Error = ConsensusError;
1452
1453	async fn import_block(
1454		&self,
1455		mut block: BlockImportParams<Block>,
1456	) -> Result<ImportResult, Self::Error> {
1457		let hash = block.post_hash();
1458		let parent_hash = *block.header.parent_hash();
1459		let number = *block.header.number();
1460		let info = self.client.info();
1461
1462		self.check_inherents_and_equivocations(&mut block).await?;
1463
1464		let block_status = self
1465			.client
1466			.status(hash)
1467			.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1468
1469		// Skip babe logic if block already in chain or importing blocks during initial sync,
1470		// otherwise the check for epoch changes will error because trying to re-import an
1471		// epoch change or because of missing epoch data in the tree, respectively.
1472		if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end)
1473			|| block_status == BlockStatus::InChain
1474		{
1475			// When re-importing existing block strip away intermediates.
1476			// In case of initial sync intermediates should not be present...
1477			let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
1478			block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1479			return self.inner.import_block(block).await.map_err(Into::into);
1480		}
1481
1482		if block.with_state() {
1483			return self.import_state(block).await;
1484		}
1485
1486		let pre_digest = find_pre_digest::<Block>(&block.header).expect(
1487			"valid babe headers must contain a predigest; header has been already verified; qed",
1488		);
1489		let slot = pre_digest.slot();
1490
1491		// If there's a pending epoch we'll save the previous epoch changes here
1492		// this way we can revert it if there's any error.
1493		let mut old_epoch_changes = None;
1494
1495		// Skip epoch change processing for warp synced blocks
1496		let epoch_changes = if block.origin != BlockOrigin::WarpSync {
1497			let parent_header = self
1498				.client
1499				.header(parent_hash)
1500				.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1501				.ok_or_else(|| {
1502					ConsensusError::ChainLookup(
1503						babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
1504					)
1505				})?;
1506
1507			let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
1508				"parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
1509				 been verified; qed",
1510			);
1511
1512			// make sure that slot number is strictly increasing
1513			if slot <= parent_slot {
1514				return Err(ConsensusError::ClientImport(
1515					babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
1516				));
1517			}
1518
1519			let mut epoch_changes = self.epoch_changes.shared_data_locked();
1520
1521			// check if there's any epoch change expected to happen at this slot.
1522			// `epoch` is the epoch to verify the block under, and `first_in_epoch` is true
1523			// if this is the first block in its chain for that epoch.
1524			//
1525			// also provides the total weight of the chain, including the imported block.
1526			let (epoch_descriptor, first_in_epoch, parent_weight) = {
1527				let parent_weight = if *parent_header.number() == Zero::zero() {
1528					0
1529				} else {
1530					aux_schema::load_block_weight(&*self.client, parent_hash)
1531						.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1532						.ok_or_else(|| {
1533							ConsensusError::ClientImport(
1534								babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
1535									.into(),
1536							)
1537						})?
1538				};
1539
1540				let intermediate =
1541					block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
1542
1543				let epoch_descriptor = intermediate.epoch_descriptor;
1544				let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
1545				(epoch_descriptor, first_in_epoch, parent_weight)
1546			};
1547
1548			let total_weight = parent_weight + pre_digest.added_weight();
1549
1550			// search for this all the time so we can reject unexpected announcements.
1551			let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
1552				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1553			let next_config_digest = find_next_config_digest::<Block>(&block.header)
1554				.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1555
1556			match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
1557				(true, true, _) => {},
1558				(false, false, false) => {},
1559				(false, false, true) => {
1560					return Err(ConsensusError::ClientImport(
1561						babe_err(Error::<Block>::UnexpectedConfigChange).into(),
1562					))
1563				},
1564				(true, false, _) => {
1565					return Err(ConsensusError::ClientImport(
1566						babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
1567					))
1568				},
1569				(false, true, _) => {
1570					return Err(ConsensusError::ClientImport(
1571						babe_err(Error::<Block>::UnexpectedEpochChange).into(),
1572					))
1573				},
1574			}
1575
1576			if let Some(next_epoch_descriptor) = next_epoch_digest {
1577				old_epoch_changes = Some((*epoch_changes).clone());
1578
1579				let mut viable_epoch = epoch_changes
1580					.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
1581					.ok_or_else(|| {
1582						ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
1583					})?
1584					.into_cloned();
1585
1586				let epoch_config = next_config_digest
1587					.map(Into::into)
1588					.unwrap_or_else(|| viable_epoch.as_ref().config.clone());
1589
1590				// restrict info logging during initial sync to avoid spam
1591				let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
1592					log::Level::Debug
1593				} else {
1594					log::Level::Info
1595				};
1596
1597				if viable_epoch.as_ref().end_slot() <= slot {
1598					// Some epochs must have been skipped as our current slot fits outside the
1599					// current epoch. We will figure out which epoch it belongs to and we will
1600					// re-use the same data for that epoch.
1601					// Notice that we are only updating a local copy of the `Epoch`, this
1602					// makes it so that when we insert the next epoch into `EpochChanges` below
1603					// (after incrementing it), it will use the correct epoch index and start
1604					// slot. We do not update the original epoch that will be re-used
1605					// because there might be other forks (that we haven't imported) where
1606					// the epoch isn't skipped, and to import those forks we want to keep
1607					// the original epoch data. Not updating the original epoch works
1608					// because when we search the tree for which epoch to use for a given
1609					// slot, we will search in-depth with the predicate `epoch.start_slot
1610					// <= slot` which will still match correctly without updating
1611					// `start_slot` to the correct value as below.
1612					let epoch = viable_epoch.as_mut();
1613					let prev_index = epoch.epoch_index;
1614					*epoch = epoch.clone_for_slot(slot);
1615
1616					warn!(
1617						target: LOG_TARGET,
1618						"👶 Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
1619					);
1620				}
1621
1622				log!(
1623					target: LOG_TARGET,
1624					log_level,
1625					"👶 New epoch {} launching at block {} (block slot {} >= start slot {}).",
1626					viable_epoch.as_ref().epoch_index,
1627					hash,
1628					slot,
1629					viable_epoch.as_ref().start_slot,
1630				);
1631
1632				let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
1633
1634				log!(
1635					target: LOG_TARGET,
1636					log_level,
1637					"👶 Next epoch starts at slot {}",
1638					next_epoch.as_ref().start_slot,
1639				);
1640
1641				// prune the tree of epochs not part of the finalized chain or
1642				// that are not live anymore, and then track the given epoch change
1643				// in the tree.
1644				// NOTE: it is important that these operations are done in this
1645				// order, otherwise if pruning after import the `is_descendent_of`
1646				// used by pruning may not know about the block that is being
1647				// imported.
1648				let prune_and_import = || {
1649					prune_finalized(self.client.clone(), &mut epoch_changes)?;
1650
1651					epoch_changes
1652						.import(
1653							descendent_query(&*self.client),
1654							hash,
1655							number,
1656							*block.header.parent_hash(),
1657							next_epoch,
1658						)
1659						.map_err(|e| {
1660							ConsensusError::ClientImport(format!(
1661								"Error importing epoch changes: {}",
1662								e
1663							))
1664						})?;
1665					Ok(())
1666				};
1667
1668				if let Err(e) = prune_and_import() {
1669					debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
1670					*epoch_changes =
1671						old_epoch_changes.expect("set `Some` above and not taken; qed");
1672					return Err(e);
1673				}
1674
1675				crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1676					block
1677						.auxiliary
1678						.extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1679				});
1680			}
1681
1682			aux_schema::write_block_weight(hash, total_weight, |values| {
1683				block
1684					.auxiliary
1685					.extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1686			});
1687
1688			// The fork choice rule is that we pick the heaviest chain (i.e.
1689			// more primary blocks), if there's a tie we go with the longest
1690			// chain.
1691			block.fork_choice = {
1692				let (last_best, last_best_number) = (info.best_hash, info.best_number);
1693
1694				let last_best_weight = if &last_best == block.header.parent_hash() {
1695					// the parent=genesis case is already covered for loading parent weight,
1696					// so we don't need to cover again here.
1697					parent_weight
1698				} else {
1699					aux_schema::load_block_weight(&*self.client, last_best)
1700						.map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1701						.ok_or_else(|| {
1702							ConsensusError::ChainLookup(
1703								"No block weight for parent header.".to_string(),
1704							)
1705						})?
1706				};
1707
1708				Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
1709					true
1710				} else if total_weight == last_best_weight {
1711					number > last_best_number
1712				} else {
1713					false
1714				}))
1715			};
1716
1717			// Release the mutex, but it stays locked
1718			Some(epoch_changes.release_mutex())
1719		} else {
1720			block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1721			None
1722		};
1723
1724		let import_result = self.inner.import_block(block).await;
1725
1726		// revert to the original epoch changes in case there's an error
1727		// importing the block
1728		if import_result.is_err() {
1729			if let (Some(mut epoch_changes), Some(old_epoch_changes)) =
1730				(epoch_changes, old_epoch_changes)
1731			{
1732				*epoch_changes.upgrade() = old_epoch_changes;
1733			}
1734		}
1735
1736		import_result.map_err(Into::into)
1737	}
1738
1739	async fn check_block(
1740		&self,
1741		block: BlockCheckParams<Block>,
1742	) -> Result<ImportResult, Self::Error> {
1743		self.inner.check_block(block).await.map_err(Into::into)
1744	}
1745}
1746
1747/// Gets the best finalized block and its slot, and prunes the given epoch tree.
1748fn prune_finalized<Block, Client>(
1749	client: Arc<Client>,
1750	epoch_changes: &mut EpochChangesFor<Block, Epoch>,
1751) -> Result<(), ConsensusError>
1752where
1753	Block: BlockT,
1754	Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = soil_client::blockchain::Error>,
1755{
1756	let info = client.info();
1757
1758	let finalized_slot = {
1759		let finalized_header = client
1760			.header(info.finalized_hash)
1761			.map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1762			.expect(
1763				"best finalized hash was given by client; finalized headers must exist in db; qed",
1764			);
1765
1766		find_pre_digest::<Block>(&finalized_header)
1767			.expect("finalized header must be valid; valid blocks have a pre-digest; qed")
1768			.slot()
1769	};
1770
1771	epoch_changes
1772		.prune_finalized(
1773			descendent_query(&*client),
1774			&info.finalized_hash,
1775			info.finalized_number,
1776			finalized_slot,
1777		)
1778		.map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1779
1780	Ok(())
1781}
1782
1783/// Produce a BABE block-import object to be used later on in the construction of
1784/// an import-queue.
1785///
1786/// Also returns a link object used to correctly instantiate the import queue
1787/// and background worker.
1788pub fn block_import<Client, Block: BlockT, I, CIDP, SC>(
1789	config: BabeConfiguration,
1790	wrapped_block_import: I,
1791	client: Arc<Client>,
1792	create_inherent_data_providers: CIDP,
1793	select_chain: SC,
1794	offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1795) -> ClientResult<(BabeBlockImport<Block, Client, I, CIDP, SC>, BabeLink<Block>)>
1796where
1797	Client: AuxStore
1798		+ HeaderBackend<Block>
1799		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1800		+ PreCommitActions<Block>
1801		+ 'static,
1802{
1803	let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
1804	let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
1805
1806	// NOTE: this isn't entirely necessary, but since we didn't use to prune the
1807	// epoch tree it is useful as a migration, so that nodes prune long trees on
1808	// startup rather than waiting until importing the next epoch change block.
1809	prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
1810
1811	let client_weak = Arc::downgrade(&client);
1812	let on_finality = move |summary: &FinalityNotification<Block>| {
1813		if let Some(client) = client_weak.upgrade() {
1814			aux_storage_cleanup(client.as_ref(), summary)
1815		} else {
1816			Default::default()
1817		}
1818	};
1819	client.register_finality_action(Box::new(on_finality));
1820
1821	let import = BabeBlockImport::new(
1822		client,
1823		epoch_changes,
1824		wrapped_block_import,
1825		config,
1826		create_inherent_data_providers,
1827		select_chain,
1828		offchain_tx_pool_factory,
1829	);
1830
1831	Ok((import, link))
1832}
1833
1834/// Parameters passed to [`import_queue`].
1835pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, Spawn> {
1836	/// The BABE link that is created by [`block_import`].
1837	pub link: BabeLink<Block>,
1838	/// The block import that should be wrapped.
1839	pub block_import: BI,
1840	/// Optional justification import.
1841	pub justification_import: Option<BoxJustificationImport<Block>>,
1842	/// The client to interact with the internals of the node.
1843	pub client: Arc<Client>,
1844	/// Slot duration.
1845	pub slot_duration: SlotDuration,
1846	/// Spawner for spawning futures.
1847	pub spawner: &'a Spawn,
1848	/// Registry for prometheus metrics.
1849	pub registry: Option<&'a Registry>,
1850	/// Optional telemetry handle to report telemetry events.
1851	pub telemetry: Option<TelemetryHandle>,
1852}
1853
1854/// Start an import queue for the BABE consensus algorithm.
1855///
1856/// This method returns the import queue, some data that needs to be passed to the block authoring
1857/// logic (`BabeLink`), and a future that must be run to
1858/// completion and is responsible for listening to finality notifications and
1859/// pruning the epoch changes tree.
1860///
1861/// The block import object provided must be the `BabeBlockImport` or a wrapper
1862/// of it, otherwise crucial import logic will be omitted.
1863pub fn import_queue<Block: BlockT, Client, BI, Spawn>(
1864	ImportQueueParams {
1865		link: babe_link,
1866		block_import,
1867		justification_import,
1868		client,
1869		slot_duration,
1870		spawner,
1871		registry,
1872		telemetry,
1873	}: ImportQueueParams<'_, Block, BI, Client, Spawn>,
1874) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
1875where
1876	BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1877	Client: ProvideRuntimeApi<Block>
1878		+ HeaderBackend<Block>
1879		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1880		+ AuxStore
1881		+ Send
1882		+ Sync
1883		+ 'static,
1884	Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1885	Spawn: SpawnEssentialNamed,
1886{
1887	const HANDLE_BUFFER_SIZE: usize = 1024;
1888
1889	let verifier = BabeVerifier {
1890		slot_duration,
1891		config: babe_link.config.clone(),
1892		epoch_changes: babe_link.epoch_changes.clone(),
1893		telemetry,
1894		client: client.clone(),
1895	};
1896
1897	let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
1898
1899	let answer_requests =
1900		answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
1901
1902	spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
1903
1904	Ok((
1905		BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
1906		BabeWorkerHandle(worker_tx),
1907	))
1908}
1909
1910/// Reverts protocol aux data to at most the last finalized block.
1911/// In particular, epoch-changes and block weights announced after the revert
1912/// point are removed.
1913pub fn revert<Block, Client, Backend>(
1914	client: Arc<Client>,
1915	backend: Arc<Backend>,
1916	blocks: NumberFor<Block>,
1917) -> ClientResult<()>
1918where
1919	Block: BlockT,
1920	Client: AuxStore
1921		+ HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1922		+ HeaderBackend<Block>
1923		+ ProvideRuntimeApi<Block>
1924		+ UsageProvider<Block>,
1925	Client::Api: BabeApi<Block>,
1926	Backend: BackendT<Block>,
1927{
1928	let best_number = client.info().best_number;
1929	let finalized = client.info().finalized_number;
1930
1931	let revertible = blocks.min(best_number - finalized);
1932	if revertible == Zero::zero() {
1933		return Ok(());
1934	}
1935
1936	let revert_up_to_number = best_number - revertible;
1937	let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
1938		format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
1939	))?;
1940
1941	// Revert epoch changes tree.
1942
1943	// This config is only used on-genesis.
1944	let config = configuration(&*client)?;
1945	let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
1946	let mut epoch_changes = epoch_changes.shared_data();
1947
1948	if revert_up_to_number == Zero::zero() {
1949		// Special case, no epoch changes data were present on genesis.
1950		*epoch_changes = EpochChangesFor::<Block, Epoch>::default();
1951	} else {
1952		epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
1953	}
1954
1955	// Remove block weights added after the revert point.
1956
1957	let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
1958
1959	let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
1960		soil_client::blockchain::tree_route(&*client, revert_up_to_hash, leaf)
1961			.map(|route| route.retracted().is_empty())
1962			.unwrap_or_default()
1963	});
1964
1965	for leaf in leaves {
1966		let mut hash = leaf;
1967		loop {
1968			let meta = client.header_metadata(hash)?;
1969			if meta.number <= revert_up_to_number
1970				|| !weight_keys.insert(aux_schema::block_weight_key(hash))
1971			{
1972				// We've reached the revert point or an already processed branch, stop here.
1973				break;
1974			}
1975			hash = meta.parent;
1976		}
1977	}
1978
1979	let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
1980
1981	// Write epoch changes and remove weights in one shot.
1982	aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
1983		client.insert_aux(values, weight_keys.iter())
1984	})
1985}
1986
1987fn query_epoch_changes<Block, Client>(
1988	epoch_changes: &SharedEpochChanges<Block, Epoch>,
1989	client: &Client,
1990	config: &BabeConfiguration,
1991	block_number: NumberFor<Block>,
1992	slot: Slot,
1993	parent_hash: Block::Hash,
1994) -> Result<
1995	(ViableEpochDescriptor<Block::Hash, NumberFor<Block>, Epoch>, ViableEpoch<Epoch>),
1996	Error<Block>,
1997>
1998where
1999	Block: BlockT,
2000	Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = soil_client::blockchain::Error>,
2001{
2002	let epoch_changes = epoch_changes.shared_data();
2003	let epoch_descriptor = epoch_changes
2004		.epoch_descriptor_for_child_of(
2005			descendent_query(client),
2006			&parent_hash,
2007			block_number - 1u32.into(),
2008			slot,
2009		)
2010		.map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
2011		.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2012	let viable_epoch = epoch_changes
2013		.viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
2014		.ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2015	Ok((epoch_descriptor, viable_epoch.into_cloned()))
2016}