1#![forbid(unsafe_code)]
67#![warn(missing_docs)]
68
69use std::{
70 collections::HashSet,
71 future::Future,
72 ops::{Deref, DerefMut},
73 pin::Pin,
74 sync::Arc,
75 task::{Context, Poll},
76 time::Duration,
77};
78
79use codec::{Decode, Encode};
80use futures::{
81 channel::{
82 mpsc::{channel, Receiver, Sender},
83 oneshot,
84 },
85 prelude::*,
86};
87use log::{debug, info, log, trace, warn};
88use parking_lot::Mutex;
89use prometheus_endpoint::Registry;
90
91use sc_client_api::{
92 backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
93 PreCommitActions, UsageProvider,
94};
95use sc_consensus::{
96 block_import::{
97 BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
98 StateAction,
99 },
100 import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
101};
102use sc_consensus_epochs::{
103 descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpoch,
104 ViableEpochDescriptor,
105};
106use sc_consensus_slots::{
107 check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
108 SlotInfo, StorageChanges,
109};
110use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
111use sc_transaction_pool_api::OffchainTransactionPoolFactory;
112use sp_api::{ApiExt, ProvideRuntimeApi};
113use sp_application_crypto::AppCrypto;
114use sp_block_builder::BlockBuilder as BlockBuilderApi;
115use sp_blockchain::{
116 Backend as _, BlockStatus, Error as ClientError, HeaderBackend, HeaderMetadata,
117 Result as ClientResult,
118};
119use sp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain};
120use sp_consensus_babe::{inherents::BabeInherentData, SlotDuration};
121use sp_consensus_slots::Slot;
122use sp_core::traits::SpawnEssentialNamed;
123use sp_inherents::{CreateInherentDataProviders, InherentDataProvider};
124use sp_keystore::KeystorePtr;
125use sp_runtime::{
126 generic::OpaqueDigestItemId,
127 traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
128 DigestItem,
129};
130
131pub use sc_consensus_slots::SlotProportion;
132pub use sp_consensus::SyncOracle;
133pub use sp_consensus_babe::{
134 digests::{
135 CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
136 PrimaryPreDigest, SecondaryPlainPreDigest,
137 },
138 AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
139 BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
140};
141
142pub use aux_schema::load_block_weight as block_weight;
143use sp_timestamp::Timestamp;
144
145mod migration;
146mod verification;
147
148pub mod authorship;
149pub mod aux_schema;
150#[cfg(test)]
151mod tests;
152
153const LOG_TARGET: &str = "babe";
154
155const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"substrate-babe-vrf";
157
158const AUTHORING_SCORE_LENGTH: usize = 16;
160
161#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
163pub struct Epoch(sp_consensus_babe::Epoch);
164
165impl Deref for Epoch {
166 type Target = sp_consensus_babe::Epoch;
167
168 fn deref(&self) -> &Self::Target {
169 &self.0
170 }
171}
172
173impl DerefMut for Epoch {
174 fn deref_mut(&mut self) -> &mut Self::Target {
175 &mut self.0
176 }
177}
178
179impl From<sp_consensus_babe::Epoch> for Epoch {
180 fn from(epoch: sp_consensus_babe::Epoch) -> Self {
181 Epoch(epoch)
182 }
183}
184
185impl EpochT for Epoch {
186 type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
187 type Slot = Slot;
188
189 fn increment(
190 &self,
191 (descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
192 ) -> Epoch {
193 sp_consensus_babe::Epoch {
194 epoch_index: self.epoch_index + 1,
195 start_slot: self.start_slot + self.duration,
196 duration: self.duration,
197 authorities: descriptor.authorities,
198 randomness: descriptor.randomness,
199 config,
200 }
201 .into()
202 }
203
204 fn start_slot(&self) -> Slot {
205 self.start_slot
206 }
207
208 fn end_slot(&self) -> Slot {
209 self.start_slot + self.duration
210 }
211}
212
213impl Epoch {
214 pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
218 sp_consensus_babe::Epoch {
219 epoch_index: 0,
220 start_slot: slot,
221 duration: genesis_config.epoch_length,
222 authorities: genesis_config.authorities.clone(),
223 randomness: genesis_config.randomness,
224 config: BabeEpochConfiguration {
225 c: genesis_config.c,
226 allowed_slots: genesis_config.allowed_slots,
227 },
228 }
229 .into()
230 }
231
232 pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
240 let mut epoch = self.clone();
241
242 let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
243
244 let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
245 "epoch number is u64; it should be strictly smaller than number of slots; \
246 slots relate in some way to wall clock time; \
247 if u64 is not enough we should crash for safety; qed.",
248 );
249
250 let start_slot = skipped_epochs
251 .checked_mul(epoch.duration)
252 .and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
253 .expect(
254 "slot number is u64; it should relate in some way to wall clock time; \
255 if u64 is not enough we should crash for safety; qed.",
256 );
257
258 epoch.epoch_index = epoch_index;
259 epoch.start_slot = Slot::from(start_slot);
260
261 epoch
262 }
263}
264
265#[derive(Debug, thiserror::Error)]
267pub enum Error<B: BlockT> {
268 #[error("Multiple BABE pre-runtime digests, rejecting!")]
270 MultiplePreRuntimeDigests,
271 #[error("No BABE pre-runtime digest found")]
273 NoPreRuntimeDigest,
274 #[error("Multiple BABE epoch change digests, rejecting!")]
276 MultipleEpochChangeDigests,
277 #[error("Multiple BABE config change digests, rejecting!")]
279 MultipleConfigChangeDigests,
280 #[error("Could not extract timestamp and slot: {0}")]
282 Extraction(ConsensusError),
283 #[error("Could not fetch epoch at {0:?}")]
285 FetchEpoch(B::Hash),
286 #[error("Header {0:?} rejected: too far in the future")]
288 TooFarInFuture(B::Hash),
289 #[error("Parent ({0}) of {1} unavailable. Cannot import")]
291 ParentUnavailable(B::Hash, B::Hash),
292 #[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
294 SlotMustIncrease(Slot, Slot),
295 #[error("Header {0:?} has a bad seal")]
297 HeaderBadSeal(B::Hash),
298 #[error("Header {0:?} is unsealed")]
300 HeaderUnsealed(B::Hash),
301 #[error("Slot author not found")]
303 SlotAuthorNotFound,
304 #[error("Secondary slot assignments are disabled for the current epoch.")]
306 SecondarySlotAssignmentsDisabled,
307 #[error("Bad signature on {0:?}")]
309 BadSignature(B::Hash),
310 #[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
312 InvalidAuthor(AuthorityId, AuthorityId),
313 #[error("No secondary author expected.")]
315 NoSecondaryAuthorExpected,
316 #[error("VRF verification failed")]
318 VrfVerificationFailed,
319 #[error("VRF output rejected, threshold {0} exceeded")]
321 VrfThresholdExceeded(u128),
322 #[error("Could not fetch parent header: {0}")]
324 FetchParentHeader(sp_blockchain::Error),
325 #[error("Expected epoch change to happen at {0:?}, s{1}")]
327 ExpectedEpochChange(B::Hash, Slot),
328 #[error("Unexpected config change")]
330 UnexpectedConfigChange,
331 #[error("Unexpected epoch change")]
333 UnexpectedEpochChange,
334 #[error("Parent block of {0} has no associated weight")]
336 ParentBlockNoAssociatedWeight(B::Hash),
337 #[error("Checking inherents failed: {0}")]
339 CheckInherents(sp_inherents::Error),
340 #[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
342 CheckInherentsUnhandled(sp_inherents::InherentIdentifier),
343 #[error("Creating inherents failed: {0}")]
345 CreateInherents(sp_inherents::Error),
346 #[error("Background worker is not running")]
348 BackgroundWorkerTerminated,
349 #[error(transparent)]
351 Client(sp_blockchain::Error),
352 #[error(transparent)]
354 RuntimeApi(sp_api::ApiError),
355 #[error(transparent)]
357 ForkTree(Box<fork_tree::Error<sp_blockchain::Error>>),
358}
359
360impl<B: BlockT> From<Error<B>> for String {
361 fn from(error: Error<B>) -> String {
362 error.to_string()
363 }
364}
365
366fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
367 debug!(target: LOG_TARGET, "{}", error);
368 error
369}
370
371pub struct BabeIntermediate<B: BlockT> {
373 pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
375}
376
377pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
379
380pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
382where
383 C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
384 C::Api: BabeApi<B>,
385{
386 let at_hash = if client.usage_info().chain.finalized_state.is_some() {
387 client.usage_info().chain.best_hash
388 } else {
389 debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
390 client.usage_info().chain.genesis_hash
391 };
392
393 let runtime_api = client.runtime_api();
394 let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
395
396 let config = match version {
397 Some(1) => {
398 #[allow(deprecated)]
399 {
400 runtime_api.configuration_before_version_2(at_hash)?.into()
401 }
402 },
403 Some(2) => runtime_api.configuration(at_hash)?,
404 _ => {
405 return Err(sp_blockchain::Error::VersionInvalid(
406 "Unsupported or invalid BabeApi version".to_string(),
407 ))
408 },
409 };
410 Ok(config)
411}
412
413pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
415 pub keystore: KeystorePtr,
417
418 pub client: Arc<C>,
420
421 pub select_chain: SC,
423
424 pub env: E,
426
427 pub block_import: I,
431
432 pub sync_oracle: SO,
434
435 pub justification_sync_link: L,
437
438 pub create_inherent_data_providers: CIDP,
440
441 pub force_authoring: bool,
443
444 pub backoff_authoring_blocks: Option<BS>,
446
447 pub babe_link: BabeLink<B>,
449
450 pub block_proposal_slot_portion: SlotProportion,
456
457 pub max_block_proposal_slot_portion: Option<SlotProportion>,
460
461 pub telemetry: Option<TelemetryHandle>,
463}
464
465pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
467 BabeParams {
468 keystore,
469 client,
470 select_chain,
471 env,
472 block_import,
473 sync_oracle,
474 justification_sync_link,
475 create_inherent_data_providers,
476 force_authoring,
477 backoff_authoring_blocks,
478 babe_link,
479 block_proposal_slot_portion,
480 max_block_proposal_slot_portion,
481 telemetry,
482 }: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
483) -> Result<BabeWorker<B>, ConsensusError>
484where
485 B: BlockT,
486 C: ProvideRuntimeApi<B>
487 + HeaderBackend<B>
488 + HeaderMetadata<B, Error = ClientError>
489 + Send
490 + Sync
491 + 'static,
492 C::Api: BabeApi<B>,
493 SC: SelectChain<B> + 'static,
494 E: Environment<B, Error = Error> + Send + Sync + 'static,
495 E::Proposer: Proposer<B, Error = Error>,
496 I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
497 SO: SyncOracle + Send + Sync + Clone + 'static,
498 L: sc_consensus::JustificationSyncLink<B> + 'static,
499 CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
500 CIDP::InherentDataProviders: InherentDataProviderExt + Send,
501 BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
502 Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
503{
504 let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
505
506 let worker = BabeSlotWorker {
507 client: client.clone(),
508 block_import,
509 env,
510 sync_oracle: sync_oracle.clone(),
511 justification_sync_link,
512 force_authoring,
513 backoff_authoring_blocks,
514 keystore,
515 epoch_changes: babe_link.epoch_changes.clone(),
516 slot_notification_sinks: slot_notification_sinks.clone(),
517 config: babe_link.config.clone(),
518 block_proposal_slot_portion,
519 max_block_proposal_slot_portion,
520 telemetry,
521 };
522
523 info!(target: LOG_TARGET, "👶 Starting BABE Authorship worker");
524
525 let slot_worker = sc_consensus_slots::start_slot_worker(
526 babe_link.config.slot_duration(),
527 select_chain,
528 sc_consensus_slots::SimpleSlotWorkerToSlotWorker(worker),
529 sync_oracle,
530 create_inherent_data_providers,
531 );
532
533 Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
534}
535
536fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
540 client: &C,
541 notification: &FinalityNotification<Block>,
542) -> AuxDataOperations {
543 let mut hashes = HashSet::new();
544
545 let first = notification.tree_route.first().unwrap_or(¬ification.hash);
546 match client.header_metadata(*first) {
547 Ok(meta) => {
548 hashes.insert(meta.parent);
549 },
550 Err(err) => {
551 warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
552 },
553 }
554
555 hashes.extend(
557 notification
558 .tree_route
559 .iter()
560 .filter(|h| **h != notification.hash),
563 );
564
565 hashes.extend(notification.stale_blocks.iter().map(|b| b.hash));
566
567 hashes
568 .into_iter()
569 .map(|val| (aux_schema::block_weight_key(val), None))
570 .collect()
571}
572
573async fn answer_requests<B: BlockT, C>(
574 mut request_rx: Receiver<BabeRequest<B>>,
575 config: BabeConfiguration,
576 client: Arc<C>,
577 epoch_changes: SharedEpochChanges<B, Epoch>,
578) where
579 C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
580{
581 while let Some(request) = request_rx.next().await {
582 match request {
583 BabeRequest::EpochData(response) => {
584 let _ = response.send(epoch_changes.shared_data().clone());
585 },
586 BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
587 let lookup = || {
588 let epoch_changes = epoch_changes.shared_data();
589 epoch_changes
590 .epoch_data_for_child_of(
591 descendent_query(&*client),
592 &parent_hash,
593 parent_number,
594 slot,
595 |slot| Epoch::genesis(&config, slot),
596 )
597 .map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
598 .ok_or(Error::<B>::FetchEpoch(parent_hash))
599 };
600
601 let _ = response.send(lookup());
602 },
603 }
604 }
605}
606
607enum BabeRequest<B: BlockT> {
609 EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
611 EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
615}
616
617#[derive(Clone)]
619pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
620
621impl<B: BlockT> BabeWorkerHandle<B> {
622 async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
623 match self.0.clone().send(request).await {
624 Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
625 Err(err) => warn!(
626 target: LOG_TARGET,
627 "Unhandled error when sending request to worker: {:?}", err
628 ),
629 _ => {},
630 }
631
632 Ok(())
633 }
634
635 pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
637 let (tx, rx) = oneshot::channel();
638 self.send_request(BabeRequest::EpochData(tx)).await?;
639
640 rx.await.or(Err(Error::BackgroundWorkerTerminated))
641 }
642
643 pub async fn epoch_data_for_child_of(
647 &self,
648 parent_hash: B::Hash,
649 parent_number: NumberFor<B>,
650 slot: Slot,
651 ) -> Result<Epoch, Error<B>> {
652 let (tx, rx) = oneshot::channel();
653 self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
654 .await?;
655
656 rx.await.or(Err(Error::BackgroundWorkerTerminated))?
657 }
658}
659
660#[must_use]
662pub struct BabeWorker<B: BlockT> {
663 inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
664 slot_notification_sinks: SlotNotificationSinks<B>,
665}
666
667impl<B: BlockT> BabeWorker<B> {
668 pub fn slot_notification_stream(
671 &self,
672 ) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
673 const CHANNEL_BUFFER_SIZE: usize = 1024;
674
675 let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
676 self.slot_notification_sinks.lock().push(sink);
677 stream
678 }
679}
680
681impl<B: BlockT> Future for BabeWorker<B> {
682 type Output = ();
683
684 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
685 self.inner.as_mut().poll(cx)
686 }
687}
688
689type SlotNotificationSinks<B> = Arc<
691 Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
692>;
693
694struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
695 client: Arc<C>,
696 block_import: I,
697 env: E,
698 sync_oracle: SO,
699 justification_sync_link: L,
700 force_authoring: bool,
701 backoff_authoring_blocks: Option<BS>,
702 keystore: KeystorePtr,
703 epoch_changes: SharedEpochChanges<B, Epoch>,
704 slot_notification_sinks: SlotNotificationSinks<B>,
705 config: BabeConfiguration,
706 block_proposal_slot_portion: SlotProportion,
707 max_block_proposal_slot_portion: Option<SlotProportion>,
708 telemetry: Option<TelemetryHandle>,
709}
710
711#[async_trait::async_trait]
712impl<B, C, E, I, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
713 for BabeSlotWorker<B, C, E, I, SO, L, BS>
714where
715 B: BlockT,
716 C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
717 C::Api: BabeApi<B>,
718 E: Environment<B, Error = Error> + Send + Sync,
719 E::Proposer: Proposer<B, Error = Error>,
720 I: BlockImport<B> + Send + Sync + 'static,
721 SO: SyncOracle + Send + Clone + Sync,
722 L: sc_consensus::JustificationSyncLink<B>,
723 BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
724 Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
725{
726 type Claim = (PreDigest, AuthorityId);
727 type SyncOracle = SO;
728 type JustificationSyncLink = L;
729 type CreateProposer =
730 Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
731 type Proposer = E::Proposer;
732 type BlockImport = I;
733 type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
734
735 fn logging_target(&self) -> &'static str {
736 LOG_TARGET
737 }
738
739 fn block_import(&mut self) -> &mut Self::BlockImport {
740 &mut self.block_import
741 }
742
743 fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
744 self.epoch_changes
745 .shared_data()
746 .epoch_descriptor_for_child_of(
747 descendent_query(&*self.client),
748 &parent.hash(),
749 *parent.number(),
750 slot,
751 )
752 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
753 .ok_or(ConsensusError::InvalidAuthoritiesSet)
754 }
755
756 fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
757 self.epoch_changes
758 .shared_data()
759 .viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
760 .map(|epoch| epoch.as_ref().authorities.len())
761 }
762
763 async fn claim_slot(
764 &mut self,
765 _parent_header: &B::Header,
766 slot: Slot,
767 epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
768 ) -> Option<Self::Claim> {
769 debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
770 let s = authorship::claim_slot(
771 slot,
772 self.epoch_changes
773 .shared_data()
774 .viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
775 .as_ref(),
776 &self.keystore,
777 );
778
779 if s.is_some() {
780 debug!(target: LOG_TARGET, "Claimed slot {}", slot);
781 }
782
783 s
784 }
785
786 fn notify_slot(
787 &self,
788 _parent_header: &B::Header,
789 slot: Slot,
790 epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
791 ) {
792 let sinks = &mut self.slot_notification_sinks.lock();
793 sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
794 Ok(()) => true,
795 Err(e) => {
796 if e.is_full() {
797 warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
798 true
799 } else {
800 false
801 }
802 },
803 });
804 }
805
806 fn pre_digest_data(&self, _slot: Slot, claim: &Self::Claim) -> Vec<sp_runtime::DigestItem> {
807 vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
808 }
809
810 async fn block_import_params(
811 &self,
812 header: B::Header,
813 header_hash: &B::Hash,
814 body: Vec<B::Extrinsic>,
815 storage_changes: StorageChanges<B>,
816 (_, public): Self::Claim,
817 epoch_descriptor: Self::AuxData,
818 ) -> Result<BlockImportParams<B>, ConsensusError> {
819 let signature = self
820 .keystore
821 .sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
822 .map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
823 .ok_or_else(|| {
824 ConsensusError::CannotSign(format!(
825 "Could not find key in keystore. Key: {:?}",
826 public
827 ))
828 })?;
829
830 let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
831
832 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
833 import_block.post_digests.push(digest_item);
834 import_block.body = Some(body);
835 import_block.state_action =
836 StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
837 import_block
838 .insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
839
840 Ok(import_block)
841 }
842
843 fn force_authoring(&self) -> bool {
844 self.force_authoring
845 }
846
847 fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
848 if let Some(ref strategy) = self.backoff_authoring_blocks {
849 if let Ok(chain_head_slot) =
850 find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
851 {
852 return strategy.should_backoff(
853 *chain_head.number(),
854 chain_head_slot,
855 self.client.info().finalized_number,
856 slot,
857 self.logging_target(),
858 );
859 }
860 }
861 false
862 }
863
864 fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
865 &mut self.sync_oracle
866 }
867
868 fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
869 &mut self.justification_sync_link
870 }
871
872 fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
873 Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
874 }
875
876 fn telemetry(&self) -> Option<TelemetryHandle> {
877 self.telemetry.clone()
878 }
879
880 fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
881 let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
882
883 sc_consensus_slots::proposing_remaining_duration(
884 parent_slot,
885 slot_info,
886 &self.block_proposal_slot_portion,
887 self.max_block_proposal_slot_portion.as_ref(),
888 sc_consensus_slots::SlotLenienceType::Exponential,
889 self.logging_target(),
890 )
891 }
892}
893
894pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
897 if header.number().is_zero() {
900 return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
901 slot: 0.into(),
902 authority_index: 0,
903 }));
904 }
905
906 let mut pre_digest: Option<_> = None;
907 for log in header.digest().logs() {
908 trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
909 match (log.as_babe_pre_digest(), pre_digest.is_some()) {
910 (Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
911 (None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
912 (s, false) => pre_digest = s,
913 }
914 }
915 pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
916}
917
918pub fn contains_epoch_change<B: BlockT>(header: &B::Header) -> bool {
920 find_next_epoch_digest::<B>(header).ok().flatten().is_some()
921}
922
923pub fn find_next_epoch_digest<B: BlockT>(
925 header: &B::Header,
926) -> Result<Option<NextEpochDescriptor>, Error<B>> {
927 let mut epoch_digest: Option<_> = None;
928 for log in header.digest().logs() {
929 trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
930 let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
931 match (log, epoch_digest.is_some()) {
932 (Some(ConsensusLog::NextEpochData(_)), true) => {
933 return Err(babe_err(Error::MultipleEpochChangeDigests))
934 },
935 (Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
936 _ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
937 }
938 }
939
940 Ok(epoch_digest)
941}
942
943fn find_next_config_digest<B: BlockT>(
945 header: &B::Header,
946) -> Result<Option<NextConfigDescriptor>, Error<B>> {
947 let mut config_digest: Option<_> = None;
948 for log in header.digest().logs() {
949 trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
950 let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
951 match (log, config_digest.is_some()) {
952 (Some(ConsensusLog::NextConfigData(_)), true) => {
953 return Err(babe_err(Error::MultipleConfigChangeDigests))
954 },
955 (Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
956 _ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
957 }
958 }
959
960 Ok(config_digest)
961}
962
963#[derive(Clone)]
965pub struct BabeLink<Block: BlockT> {
966 epoch_changes: SharedEpochChanges<Block, Epoch>,
967 config: BabeConfiguration,
968}
969
970impl<Block: BlockT> BabeLink<Block> {
971 pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
973 &self.epoch_changes
974 }
975
976 pub fn config(&self) -> &BabeConfiguration {
978 &self.config
979 }
980}
981
982pub struct BabeVerifier<Block: BlockT, Client> {
984 client: Arc<Client>,
985 slot_duration: SlotDuration,
986 config: BabeConfiguration,
987 epoch_changes: SharedEpochChanges<Block, Epoch>,
988 telemetry: Option<TelemetryHandle>,
989}
990
991#[async_trait::async_trait]
992impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client>
993where
994 Block: BlockT,
995 Client: HeaderMetadata<Block, Error = sp_blockchain::Error>
996 + HeaderBackend<Block>
997 + ProvideRuntimeApi<Block>
998 + Send
999 + Sync
1000 + AuxStore,
1001 Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
1002{
1003 async fn verify(
1004 &self,
1005 mut block: BlockImportParams<Block>,
1006 ) -> Result<BlockImportParams<Block>, String> {
1007 trace!(
1008 target: LOG_TARGET,
1009 "Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
1010 block.origin,
1011 block.header,
1012 block.justifications,
1013 block.body,
1014 );
1015
1016 let hash = block.header.hash();
1017 let parent_hash = *block.header.parent_hash();
1018
1019 let number = block.header.number();
1020
1021 if should_skip_verification(&*self.client, &block) {
1022 return Ok(block);
1023 }
1024
1025 debug!(
1026 target: LOG_TARGET,
1027 "We have {:?} logs in this header",
1028 block.header.digest().logs().len()
1029 );
1030
1031 let slot_now = Slot::from_timestamp(Timestamp::current(), self.slot_duration);
1032
1033 let pre_digest = find_pre_digest::<Block>(&block.header)?;
1034 let (check_header, epoch_descriptor) = {
1035 let (epoch_descriptor, viable_epoch) = query_epoch_changes(
1036 &self.epoch_changes,
1037 self.client.as_ref(),
1038 &self.config,
1039 *number,
1040 pre_digest.slot(),
1041 parent_hash,
1042 )?;
1043
1044 let v_params = verification::VerificationParams {
1047 header: block.header.clone(),
1048 pre_digest: Some(pre_digest),
1049 slot_now: slot_now + 1,
1050 epoch: viable_epoch.as_ref(),
1051 };
1052
1053 (verification::check_header::<Block>(v_params)?, epoch_descriptor)
1054 };
1055
1056 match check_header {
1057 CheckedHeader::Checked(pre_header, verified_info) => {
1058 trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
1059 telemetry!(
1060 self.telemetry;
1061 CONSENSUS_TRACE;
1062 "babe.checked_and_importing";
1063 "pre_header" => ?pre_header,
1064 );
1065
1066 block.header = pre_header;
1067 block.post_digests.push(verified_info.seal);
1068 block.insert_intermediate(
1069 INTERMEDIATE_KEY,
1070 BabeIntermediate::<Block> { epoch_descriptor },
1071 );
1072 block.post_hash = Some(hash);
1073
1074 Ok(block)
1075 },
1076 CheckedHeader::Deferred(a, b) => {
1077 debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
1078 telemetry!(
1079 self.telemetry;
1080 CONSENSUS_DEBUG;
1081 "babe.header_too_far_in_future";
1082 "hash" => ?hash, "a" => ?a, "b" => ?b
1083 );
1084 Err(Error::<Block>::TooFarInFuture(hash).into())
1085 },
1086 }
1087 }
1088}
1089
1090fn should_skip_verification<B: BlockT>(
1097 client: &impl HeaderBackend<B>,
1098 block: &BlockImportParams<B>,
1099) -> bool {
1100 block.origin == BlockOrigin::WarpSync || block.with_state() || {
1101 let number = *block.header.number();
1102 let info = client.info();
1103 info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end)
1104 }
1105}
1106
1107pub struct BabeBlockImport<Block: BlockT, Client, I, CIDP, SC> {
1116 inner: I,
1117 client: Arc<Client>,
1118 epoch_changes: SharedEpochChanges<Block, Epoch>,
1119 create_inherent_data_providers: CIDP,
1120 config: BabeConfiguration,
1121 select_chain: SC,
1126 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1130}
1131
1132impl<Block: BlockT, I: Clone, Client, CIDP: Clone, SC: Clone> Clone
1133 for BabeBlockImport<Block, Client, I, CIDP, SC>
1134{
1135 fn clone(&self) -> Self {
1136 BabeBlockImport {
1137 inner: self.inner.clone(),
1138 client: self.client.clone(),
1139 epoch_changes: self.epoch_changes.clone(),
1140 config: self.config.clone(),
1141 create_inherent_data_providers: self.create_inherent_data_providers.clone(),
1142 select_chain: self.select_chain.clone(),
1143 offchain_tx_pool_factory: self.offchain_tx_pool_factory.clone(),
1144 }
1145 }
1146}
1147
1148impl<Block: BlockT, Client, I, CIDP, SC> BabeBlockImport<Block, Client, I, CIDP, SC> {
1149 fn new(
1150 client: Arc<Client>,
1151 epoch_changes: SharedEpochChanges<Block, Epoch>,
1152 block_import: I,
1153 config: BabeConfiguration,
1154 create_inherent_data_providers: CIDP,
1155 select_chain: SC,
1156 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1157 ) -> Self {
1158 BabeBlockImport {
1159 client,
1160 inner: block_import,
1161 epoch_changes,
1162 config,
1163 create_inherent_data_providers,
1164 select_chain,
1165 offchain_tx_pool_factory,
1166 }
1167 }
1168}
1169
1170impl<Block, Client, Inner, CIDP, SC> BabeBlockImport<Block, Client, Inner, CIDP, SC>
1171where
1172 Block: BlockT,
1173 Inner: BlockImport<Block> + Send + Sync,
1174 Inner::Error: Into<ConsensusError>,
1175 Client: HeaderBackend<Block>
1176 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1177 + AuxStore
1178 + ProvideRuntimeApi<Block>
1179 + Send
1180 + Sync,
1181 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1182 CIDP: CreateInherentDataProviders<Block, ()>,
1183 CIDP::InherentDataProviders: InherentDataProviderExt + Send,
1184 SC: sp_consensus::SelectChain<Block> + 'static,
1185{
1186 async fn import_state(
1190 &self,
1191 mut block: BlockImportParams<Block>,
1192 ) -> Result<ImportResult, ConsensusError> {
1193 let hash = block.post_hash();
1194 let parent_hash = *block.header.parent_hash();
1195 let number = *block.header.number();
1196
1197 block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
1198 aux_schema::write_block_weight(hash, 0, |values| {
1200 block
1201 .auxiliary
1202 .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1203 });
1204
1205 let import_result = self.inner.import_block(block).await;
1207 let aux = match import_result {
1208 Ok(ImportResult::Imported(aux)) => aux,
1209 Ok(r) => {
1210 return Err(ConsensusError::ClientImport(format!(
1211 "Unexpected import result: {:?}",
1212 r
1213 )))
1214 },
1215 Err(r) => return Err(r.into()),
1216 };
1217
1218 let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
1220 ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1221 })?;
1222 let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
1223 ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1224 })?;
1225
1226 let mut epoch_changes = self.epoch_changes.shared_data_locked();
1227 epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
1228 aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1229 self.client.insert_aux(insert, [])
1230 })
1231 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1232
1233 Ok(ImportResult::Imported(aux))
1234 }
1235
1236 async fn check_inherents_and_equivocations(
1238 &self,
1239 block: &mut BlockImportParams<Block>,
1240 ) -> Result<(), ConsensusError> {
1241 if should_skip_verification(&*self.client, block) {
1242 return Ok(());
1243 }
1244
1245 let parent_hash = *block.header.parent_hash();
1246 let number = *block.header.number();
1247
1248 let create_inherent_data_providers = self
1249 .create_inherent_data_providers
1250 .create_inherent_data_providers(parent_hash, ())
1251 .await?;
1252
1253 let slot_now = create_inherent_data_providers.slot();
1254
1255 let babe_pre_digest = find_pre_digest::<Block>(&block.header)
1256 .map_err(|e| ConsensusError::Other(Box::new(e)))?;
1257 let slot = babe_pre_digest.slot();
1258
1259 self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
1261 .await?;
1262
1263 let author = {
1265 let viable_epoch = query_epoch_changes(
1266 &self.epoch_changes,
1267 self.client.as_ref(),
1268 &self.config,
1269 number,
1270 slot,
1271 parent_hash,
1272 )
1273 .map_err(|e| ConsensusError::Other(babe_err(e).into()))?
1274 .1;
1275 match viable_epoch
1276 .as_ref()
1277 .authorities
1278 .get(babe_pre_digest.authority_index() as usize)
1279 {
1280 Some(author) => author.0.clone(),
1281 None => {
1282 return Err(ConsensusError::Other(Error::<Block>::SlotAuthorNotFound.into()))
1283 },
1284 }
1285 };
1286 if let Err(err) = self
1287 .check_and_report_equivocation(slot_now, slot, &block.header, &author, &block.origin)
1288 .await
1289 {
1290 warn!(
1291 target: LOG_TARGET,
1292 "Error checking/reporting BABE equivocation: {}", err
1293 );
1294 }
1295 Ok(())
1296 }
1297
1298 async fn check_inherents(
1299 &self,
1300 block: &mut BlockImportParams<Block>,
1301 at_hash: Block::Hash,
1302 slot: Slot,
1303 create_inherent_data_providers: CIDP::InherentDataProviders,
1304 ) -> Result<(), ConsensusError> {
1305 if block.state_action.skip_execution_checks() {
1306 return Ok(());
1307 }
1308
1309 if let Some(inner_body) = block.body.take() {
1310 let new_block = Block::new(block.header.clone(), inner_body);
1311 let mut inherent_data = create_inherent_data_providers
1315 .create_inherent_data()
1316 .await
1317 .map_err(|e| ConsensusError::Other(Box::new(e)))?;
1318 inherent_data.babe_replace_inherent_data(slot);
1319
1320 use sp_block_builder::CheckInherentsError;
1321
1322 sp_block_builder::check_inherents_with_data(
1323 self.client.clone(),
1324 at_hash,
1325 new_block.clone(),
1326 &create_inherent_data_providers,
1327 inherent_data,
1328 )
1329 .await
1330 .map_err(|e| {
1331 ConsensusError::Other(Box::new(match e {
1332 CheckInherentsError::CreateInherentData(e) => {
1333 Error::<Block>::CreateInherents(e)
1334 },
1335 CheckInherentsError::Client(e) => Error::RuntimeApi(e),
1336 CheckInherentsError::CheckInherents(e) => Error::CheckInherents(e),
1337 CheckInherentsError::CheckInherentsUnknownError(id) => {
1338 Error::CheckInherentsUnhandled(id)
1339 },
1340 }))
1341 })?;
1342 let (_, inner_body) = new_block.deconstruct();
1343 block.body = Some(inner_body);
1344 }
1345
1346 Ok(())
1347 }
1348
1349 async fn check_and_report_equivocation(
1350 &self,
1351 slot_now: Slot,
1352 slot: Slot,
1353 header: &Block::Header,
1354 author: &AuthorityId,
1355 origin: &BlockOrigin,
1356 ) -> Result<(), Error<Block>> {
1357 if *origin == BlockOrigin::NetworkInitialSync {
1360 return Ok(());
1361 }
1362
1363 let Some(equivocation_proof) =
1365 check_equivocation(&*self.client, slot_now, slot, header, author)
1366 .map_err(Error::Client)?
1367 else {
1368 return Ok(());
1369 };
1370
1371 info!(
1372 target: LOG_TARGET,
1373 "Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
1374 author,
1375 slot,
1376 equivocation_proof.first_header.hash(),
1377 equivocation_proof.second_header.hash(),
1378 );
1379
1380 let best_hash = self
1382 .select_chain
1383 .best_chain()
1384 .await
1385 .map(|h| h.hash())
1386 .map_err(|e| Error::Client(e.into()))?;
1387
1388 let generate_key_owner_proof = |at_hash: Block::Hash| {
1397 self.client
1398 .runtime_api()
1399 .generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
1400 .map_err(Error::RuntimeApi)
1401 };
1402
1403 let parent_hash = *header.parent_hash();
1404 let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
1405 Some(proof) => proof,
1406 None => match generate_key_owner_proof(best_hash)? {
1407 Some(proof) => proof,
1408 None => {
1409 debug!(
1410 target: LOG_TARGET,
1411 "Equivocation offender is not part of the authority set."
1412 );
1413 return Ok(());
1414 },
1415 },
1416 };
1417
1418 let mut runtime_api = self.client.runtime_api();
1420
1421 runtime_api
1423 .register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
1424
1425 runtime_api
1426 .submit_report_equivocation_unsigned_extrinsic(
1427 best_hash,
1428 equivocation_proof,
1429 key_owner_proof,
1430 )
1431 .map_err(Error::RuntimeApi)?;
1432
1433 info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
1434
1435 Ok(())
1436 }
1437}
1438
1439#[async_trait::async_trait]
1440impl<Block, Client, Inner, CIDP, SC> BlockImport<Block>
1441 for BabeBlockImport<Block, Client, Inner, CIDP, SC>
1442where
1443 Block: BlockT,
1444 Inner: BlockImport<Block> + Send + Sync,
1445 Inner::Error: Into<ConsensusError>,
1446 Client: HeaderBackend<Block>
1447 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1448 + AuxStore
1449 + ProvideRuntimeApi<Block>
1450 + Send
1451 + Sync,
1452 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1453 CIDP: CreateInherentDataProviders<Block, ()>,
1454 CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
1455 SC: SelectChain<Block> + 'static,
1456{
1457 type Error = ConsensusError;
1458
1459 async fn import_block(
1460 &self,
1461 mut block: BlockImportParams<Block>,
1462 ) -> Result<ImportResult, Self::Error> {
1463 let hash = block.post_hash();
1464 let parent_hash = *block.header.parent_hash();
1465 let number = *block.header.number();
1466 let info = self.client.info();
1467
1468 self.check_inherents_and_equivocations(&mut block).await?;
1469
1470 let block_status = self
1471 .client
1472 .status(hash)
1473 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1474
1475 if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
1479 block_status == BlockStatus::InChain
1480 {
1481 let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
1484 block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1485 return self.inner.import_block(block).await.map_err(Into::into);
1486 }
1487
1488 if block.with_state() {
1489 return self.import_state(block).await;
1490 }
1491
1492 let pre_digest = find_pre_digest::<Block>(&block.header).expect(
1493 "valid babe headers must contain a predigest; header has been already verified; qed",
1494 );
1495 let slot = pre_digest.slot();
1496
1497 let mut old_epoch_changes = None;
1500
1501 let epoch_changes = if block.origin != BlockOrigin::WarpSync {
1503 let parent_header = self
1504 .client
1505 .header(parent_hash)
1506 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1507 .ok_or_else(|| {
1508 ConsensusError::ChainLookup(
1509 babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
1510 )
1511 })?;
1512
1513 let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
1514 "parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
1515 been verified; qed",
1516 );
1517
1518 if slot <= parent_slot {
1520 return Err(ConsensusError::ClientImport(
1521 babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
1522 ));
1523 }
1524
1525 let mut epoch_changes = self.epoch_changes.shared_data_locked();
1526
1527 let (epoch_descriptor, first_in_epoch, parent_weight) = {
1533 let parent_weight = if *parent_header.number() == Zero::zero() {
1534 0
1535 } else {
1536 aux_schema::load_block_weight(&*self.client, parent_hash)
1537 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1538 .ok_or_else(|| {
1539 ConsensusError::ClientImport(
1540 babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
1541 .into(),
1542 )
1543 })?
1544 };
1545
1546 let intermediate =
1547 block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
1548
1549 let epoch_descriptor = intermediate.epoch_descriptor;
1550 let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
1551 (epoch_descriptor, first_in_epoch, parent_weight)
1552 };
1553
1554 let total_weight = parent_weight + pre_digest.added_weight();
1555
1556 let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
1558 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1559 let next_config_digest = find_next_config_digest::<Block>(&block.header)
1560 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1561
1562 match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
1563 (true, true, _) => {},
1564 (false, false, false) => {},
1565 (false, false, true) => {
1566 return Err(ConsensusError::ClientImport(
1567 babe_err(Error::<Block>::UnexpectedConfigChange).into(),
1568 ))
1569 },
1570 (true, false, _) => {
1571 return Err(ConsensusError::ClientImport(
1572 babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
1573 ))
1574 },
1575 (false, true, _) => {
1576 return Err(ConsensusError::ClientImport(
1577 babe_err(Error::<Block>::UnexpectedEpochChange).into(),
1578 ))
1579 },
1580 }
1581
1582 if let Some(next_epoch_descriptor) = next_epoch_digest {
1583 old_epoch_changes = Some((*epoch_changes).clone());
1584
1585 let mut viable_epoch = epoch_changes
1586 .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
1587 .ok_or_else(|| {
1588 ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
1589 })?
1590 .into_cloned();
1591
1592 let epoch_config = next_config_digest
1593 .map(Into::into)
1594 .unwrap_or_else(|| viable_epoch.as_ref().config.clone());
1595
1596 let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
1598 log::Level::Debug
1599 } else {
1600 log::Level::Info
1601 };
1602
1603 if viable_epoch.as_ref().end_slot() <= slot {
1604 let epoch = viable_epoch.as_mut();
1619 let prev_index = epoch.epoch_index;
1620 *epoch = epoch.clone_for_slot(slot);
1621
1622 warn!(
1623 target: LOG_TARGET,
1624 "👶 Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
1625 );
1626 }
1627
1628 log!(
1629 target: LOG_TARGET,
1630 log_level,
1631 "👶 New epoch {} launching at block {} (block slot {} >= start slot {}).",
1632 viable_epoch.as_ref().epoch_index,
1633 hash,
1634 slot,
1635 viable_epoch.as_ref().start_slot,
1636 );
1637
1638 let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
1639
1640 log!(
1641 target: LOG_TARGET,
1642 log_level,
1643 "👶 Next epoch starts at slot {}",
1644 next_epoch.as_ref().start_slot,
1645 );
1646
1647 let prune_and_import = || {
1655 prune_finalized(self.client.clone(), &mut epoch_changes)?;
1656
1657 epoch_changes
1658 .import(
1659 descendent_query(&*self.client),
1660 hash,
1661 number,
1662 *block.header.parent_hash(),
1663 next_epoch,
1664 )
1665 .map_err(|e| {
1666 ConsensusError::ClientImport(format!(
1667 "Error importing epoch changes: {}",
1668 e
1669 ))
1670 })?;
1671 Ok(())
1672 };
1673
1674 if let Err(e) = prune_and_import() {
1675 debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
1676 *epoch_changes =
1677 old_epoch_changes.expect("set `Some` above and not taken; qed");
1678 return Err(e);
1679 }
1680
1681 crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1682 block
1683 .auxiliary
1684 .extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1685 });
1686 }
1687
1688 aux_schema::write_block_weight(hash, total_weight, |values| {
1689 block
1690 .auxiliary
1691 .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1692 });
1693
1694 block.fork_choice = {
1698 let (last_best, last_best_number) = (info.best_hash, info.best_number);
1699
1700 let last_best_weight = if &last_best == block.header.parent_hash() {
1701 parent_weight
1704 } else {
1705 aux_schema::load_block_weight(&*self.client, last_best)
1706 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1707 .ok_or_else(|| {
1708 ConsensusError::ChainLookup(
1709 "No block weight for parent header.".to_string(),
1710 )
1711 })?
1712 };
1713
1714 Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
1715 true
1716 } else if total_weight == last_best_weight {
1717 number > last_best_number
1718 } else {
1719 false
1720 }))
1721 };
1722
1723 Some(epoch_changes.release_mutex())
1725 } else {
1726 block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1727 None
1728 };
1729
1730 let import_result = self.inner.import_block(block).await;
1731
1732 if import_result.is_err() {
1735 if let (Some(mut epoch_changes), Some(old_epoch_changes)) =
1736 (epoch_changes, old_epoch_changes)
1737 {
1738 *epoch_changes.upgrade() = old_epoch_changes;
1739 }
1740 }
1741
1742 import_result.map_err(Into::into)
1743 }
1744
1745 async fn check_block(
1746 &self,
1747 block: BlockCheckParams<Block>,
1748 ) -> Result<ImportResult, Self::Error> {
1749 self.inner.check_block(block).await.map_err(Into::into)
1750 }
1751}
1752
1753fn prune_finalized<Block, Client>(
1755 client: Arc<Client>,
1756 epoch_changes: &mut EpochChangesFor<Block, Epoch>,
1757) -> Result<(), ConsensusError>
1758where
1759 Block: BlockT,
1760 Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
1761{
1762 let info = client.info();
1763
1764 let finalized_slot = {
1765 let finalized_header = client
1766 .header(info.finalized_hash)
1767 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1768 .expect(
1769 "best finalized hash was given by client; finalized headers must exist in db; qed",
1770 );
1771
1772 find_pre_digest::<Block>(&finalized_header)
1773 .expect("finalized header must be valid; valid blocks have a pre-digest; qed")
1774 .slot()
1775 };
1776
1777 epoch_changes
1778 .prune_finalized(
1779 descendent_query(&*client),
1780 &info.finalized_hash,
1781 info.finalized_number,
1782 finalized_slot,
1783 )
1784 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1785
1786 Ok(())
1787}
1788
1789pub fn block_import<Client, Block: BlockT, I, CIDP, SC>(
1795 config: BabeConfiguration,
1796 wrapped_block_import: I,
1797 client: Arc<Client>,
1798 create_inherent_data_providers: CIDP,
1799 select_chain: SC,
1800 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1801) -> ClientResult<(BabeBlockImport<Block, Client, I, CIDP, SC>, BabeLink<Block>)>
1802where
1803 Client: AuxStore
1804 + HeaderBackend<Block>
1805 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1806 + PreCommitActions<Block>
1807 + 'static,
1808{
1809 let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
1810 let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
1811
1812 prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
1816
1817 let client_weak = Arc::downgrade(&client);
1818 let on_finality = move |summary: &FinalityNotification<Block>| {
1819 if let Some(client) = client_weak.upgrade() {
1820 aux_storage_cleanup(client.as_ref(), summary)
1821 } else {
1822 Default::default()
1823 }
1824 };
1825 client.register_finality_action(Box::new(on_finality));
1826
1827 let import = BabeBlockImport::new(
1828 client,
1829 epoch_changes,
1830 wrapped_block_import,
1831 config,
1832 create_inherent_data_providers,
1833 select_chain,
1834 offchain_tx_pool_factory,
1835 );
1836
1837 Ok((import, link))
1838}
1839
1840pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, Spawn> {
1842 pub link: BabeLink<Block>,
1844 pub block_import: BI,
1846 pub justification_import: Option<BoxJustificationImport<Block>>,
1848 pub client: Arc<Client>,
1850 pub slot_duration: SlotDuration,
1852 pub spawner: &'a Spawn,
1854 pub registry: Option<&'a Registry>,
1856 pub telemetry: Option<TelemetryHandle>,
1858}
1859
1860pub fn import_queue<Block: BlockT, Client, BI, Spawn>(
1870 ImportQueueParams {
1871 link: babe_link,
1872 block_import,
1873 justification_import,
1874 client,
1875 slot_duration,
1876 spawner,
1877 registry,
1878 telemetry,
1879 }: ImportQueueParams<'_, Block, BI, Client, Spawn>,
1880) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
1881where
1882 BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1883 Client: ProvideRuntimeApi<Block>
1884 + HeaderBackend<Block>
1885 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1886 + AuxStore
1887 + Send
1888 + Sync
1889 + 'static,
1890 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1891 Spawn: SpawnEssentialNamed,
1892{
1893 const HANDLE_BUFFER_SIZE: usize = 1024;
1894
1895 let verifier = BabeVerifier {
1896 slot_duration,
1897 config: babe_link.config.clone(),
1898 epoch_changes: babe_link.epoch_changes.clone(),
1899 telemetry,
1900 client: client.clone(),
1901 };
1902
1903 let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
1904
1905 let answer_requests =
1906 answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
1907
1908 spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
1909
1910 Ok((
1911 BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
1912 BabeWorkerHandle(worker_tx),
1913 ))
1914}
1915
1916pub fn revert<Block, Client, Backend>(
1920 client: Arc<Client>,
1921 backend: Arc<Backend>,
1922 blocks: NumberFor<Block>,
1923) -> ClientResult<()>
1924where
1925 Block: BlockT,
1926 Client: AuxStore
1927 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1928 + HeaderBackend<Block>
1929 + ProvideRuntimeApi<Block>
1930 + UsageProvider<Block>,
1931 Client::Api: BabeApi<Block>,
1932 Backend: BackendT<Block>,
1933{
1934 let best_number = client.info().best_number;
1935 let finalized = client.info().finalized_number;
1936
1937 let revertible = blocks.min(best_number - finalized);
1938 if revertible == Zero::zero() {
1939 return Ok(());
1940 }
1941
1942 let revert_up_to_number = best_number - revertible;
1943 let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
1944 format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
1945 ))?;
1946
1947 let config = configuration(&*client)?;
1951 let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
1952 let mut epoch_changes = epoch_changes.shared_data();
1953
1954 if revert_up_to_number == Zero::zero() {
1955 *epoch_changes = EpochChangesFor::<Block, Epoch>::default();
1957 } else {
1958 epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
1959 }
1960
1961 let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
1964
1965 let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
1966 sp_blockchain::tree_route(&*client, revert_up_to_hash, leaf)
1967 .map(|route| route.retracted().is_empty())
1968 .unwrap_or_default()
1969 });
1970
1971 for leaf in leaves {
1972 let mut hash = leaf;
1973 loop {
1974 let meta = client.header_metadata(hash)?;
1975 if meta.number <= revert_up_to_number ||
1976 !weight_keys.insert(aux_schema::block_weight_key(hash))
1977 {
1978 break;
1980 }
1981 hash = meta.parent;
1982 }
1983 }
1984
1985 let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
1986
1987 aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
1989 client.insert_aux(values, weight_keys.iter())
1990 })
1991}
1992
1993fn query_epoch_changes<Block, Client>(
1994 epoch_changes: &SharedEpochChanges<Block, Epoch>,
1995 client: &Client,
1996 config: &BabeConfiguration,
1997 block_number: NumberFor<Block>,
1998 slot: Slot,
1999 parent_hash: Block::Hash,
2000) -> Result<
2001 (ViableEpochDescriptor<Block::Hash, NumberFor<Block>, Epoch>, ViableEpoch<Epoch>),
2002 Error<Block>,
2003>
2004where
2005 Block: BlockT,
2006 Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
2007{
2008 let epoch_changes = epoch_changes.shared_data();
2009 let epoch_descriptor = epoch_changes
2010 .epoch_descriptor_for_child_of(
2011 descendent_query(client),
2012 &parent_hash,
2013 block_number - 1u32.into(),
2014 slot,
2015 )
2016 .map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
2017 .ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2018 let viable_epoch = epoch_changes
2019 .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
2020 .ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2021 Ok((epoch_descriptor, viable_epoch.into_cloned()))
2022}