1#![forbid(unsafe_code)]
67#![warn(missing_docs)]
68
69use std::{
70 collections::HashSet,
71 future::Future,
72 ops::{Deref, DerefMut},
73 pin::Pin,
74 sync::Arc,
75 task::{Context, Poll},
76 time::Duration,
77};
78
79use codec::{Decode, Encode};
80use futures::{
81 channel::{
82 mpsc::{channel, Receiver, Sender},
83 oneshot,
84 },
85 prelude::*,
86};
87use log::{debug, info, log, trace, warn};
88use parking_lot::Mutex;
89use prometheus_endpoint::Registry;
90
91use sc_client_api::{
92 backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
93 PreCommitActions, UsageProvider,
94};
95use sc_consensus::{
96 block_import::{
97 BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
98 StateAction,
99 },
100 import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
101};
102use sc_consensus_epochs::{
103 descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpoch,
104 ViableEpochDescriptor,
105};
106use sc_consensus_slots::{
107 check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
108 SlotInfo, StorageChanges,
109};
110use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
111use sc_transaction_pool_api::OffchainTransactionPoolFactory;
112use sp_api::{ApiExt, ProvideRuntimeApi};
113use sp_application_crypto::AppCrypto;
114use sp_block_builder::BlockBuilder as BlockBuilderApi;
115use sp_blockchain::{
116 Backend as _, BlockStatus, Error as ClientError, HeaderBackend, HeaderMetadata,
117 Result as ClientResult,
118};
119use sp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain};
120use sp_consensus_babe::{inherents::BabeInherentData, SlotDuration};
121use sp_consensus_slots::Slot;
122use sp_core::traits::SpawnEssentialNamed;
123use sp_inherents::{CreateInherentDataProviders, InherentDataProvider};
124use sp_keystore::KeystorePtr;
125use sp_runtime::{
126 generic::OpaqueDigestItemId,
127 traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
128 DigestItem,
129};
130
131pub use sc_consensus_slots::SlotProportion;
132pub use sp_consensus::SyncOracle;
133pub use sp_consensus_babe::{
134 digests::{
135 CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
136 PrimaryPreDigest, SecondaryPlainPreDigest,
137 },
138 AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
139 BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
140};
141
142pub use aux_schema::load_block_weight as block_weight;
143use sp_timestamp::Timestamp;
144
145mod migration;
146mod verification;
147
148pub mod authorship;
149pub mod aux_schema;
150#[cfg(test)]
151mod tests;
152
153const LOG_TARGET: &str = "babe";
154
155const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"substrate-babe-vrf";
157
158const AUTHORING_SCORE_LENGTH: usize = 16;
160
161#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
163pub struct Epoch(sp_consensus_babe::Epoch);
164
165impl Deref for Epoch {
166 type Target = sp_consensus_babe::Epoch;
167
168 fn deref(&self) -> &Self::Target {
169 &self.0
170 }
171}
172
173impl DerefMut for Epoch {
174 fn deref_mut(&mut self) -> &mut Self::Target {
175 &mut self.0
176 }
177}
178
179impl From<sp_consensus_babe::Epoch> for Epoch {
180 fn from(epoch: sp_consensus_babe::Epoch) -> Self {
181 Epoch(epoch)
182 }
183}
184
185impl EpochT for Epoch {
186 type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
187 type Slot = Slot;
188
189 fn increment(
190 &self,
191 (descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
192 ) -> Epoch {
193 sp_consensus_babe::Epoch {
194 epoch_index: self.epoch_index + 1,
195 start_slot: self.start_slot + self.duration,
196 duration: self.duration,
197 authorities: descriptor.authorities,
198 randomness: descriptor.randomness,
199 config,
200 }
201 .into()
202 }
203
204 fn start_slot(&self) -> Slot {
205 self.start_slot
206 }
207
208 fn end_slot(&self) -> Slot {
209 self.start_slot + self.duration
210 }
211}
212
213impl Epoch {
214 pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
218 sp_consensus_babe::Epoch {
219 epoch_index: 0,
220 start_slot: slot,
221 duration: genesis_config.epoch_length,
222 authorities: genesis_config.authorities.clone(),
223 randomness: genesis_config.randomness,
224 config: BabeEpochConfiguration {
225 c: genesis_config.c,
226 allowed_slots: genesis_config.allowed_slots,
227 },
228 }
229 .into()
230 }
231
232 pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
240 let mut epoch = self.clone();
241
242 let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
243
244 let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
245 "epoch number is u64; it should be strictly smaller than number of slots; \
246 slots relate in some way to wall clock time; \
247 if u64 is not enough we should crash for safety; qed.",
248 );
249
250 let start_slot = skipped_epochs
251 .checked_mul(epoch.duration)
252 .and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
253 .expect(
254 "slot number is u64; it should relate in some way to wall clock time; \
255 if u64 is not enough we should crash for safety; qed.",
256 );
257
258 epoch.epoch_index = epoch_index;
259 epoch.start_slot = Slot::from(start_slot);
260
261 epoch
262 }
263}
264
265#[derive(Debug, thiserror::Error)]
267pub enum Error<B: BlockT> {
268 #[error("Multiple BABE pre-runtime digests, rejecting!")]
270 MultiplePreRuntimeDigests,
271 #[error("No BABE pre-runtime digest found")]
273 NoPreRuntimeDigest,
274 #[error("Multiple BABE epoch change digests, rejecting!")]
276 MultipleEpochChangeDigests,
277 #[error("Multiple BABE config change digests, rejecting!")]
279 MultipleConfigChangeDigests,
280 #[error("Could not extract timestamp and slot: {0}")]
282 Extraction(ConsensusError),
283 #[error("Could not fetch epoch at {0:?}")]
285 FetchEpoch(B::Hash),
286 #[error("Header {0:?} rejected: too far in the future")]
288 TooFarInFuture(B::Hash),
289 #[error("Parent ({0}) of {1} unavailable. Cannot import")]
291 ParentUnavailable(B::Hash, B::Hash),
292 #[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
294 SlotMustIncrease(Slot, Slot),
295 #[error("Header {0:?} has a bad seal")]
297 HeaderBadSeal(B::Hash),
298 #[error("Header {0:?} is unsealed")]
300 HeaderUnsealed(B::Hash),
301 #[error("Slot author not found")]
303 SlotAuthorNotFound,
304 #[error("Secondary slot assignments are disabled for the current epoch.")]
306 SecondarySlotAssignmentsDisabled,
307 #[error("Bad signature on {0:?}")]
309 BadSignature(B::Hash),
310 #[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
312 InvalidAuthor(AuthorityId, AuthorityId),
313 #[error("No secondary author expected.")]
315 NoSecondaryAuthorExpected,
316 #[error("VRF verification failed")]
318 VrfVerificationFailed,
319 #[error("VRF output rejected, threshold {0} exceeded")]
321 VrfThresholdExceeded(u128),
322 #[error("Could not fetch parent header: {0}")]
324 FetchParentHeader(sp_blockchain::Error),
325 #[error("Expected epoch change to happen at {0:?}, s{1}")]
327 ExpectedEpochChange(B::Hash, Slot),
328 #[error("Unexpected config change")]
330 UnexpectedConfigChange,
331 #[error("Unexpected epoch change")]
333 UnexpectedEpochChange,
334 #[error("Parent block of {0} has no associated weight")]
336 ParentBlockNoAssociatedWeight(B::Hash),
337 #[error("Checking inherents failed: {0}")]
339 CheckInherents(sp_inherents::Error),
340 #[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
342 CheckInherentsUnhandled(sp_inherents::InherentIdentifier),
343 #[error("Creating inherents failed: {0}")]
345 CreateInherents(sp_inherents::Error),
346 #[error("Background worker is not running")]
348 BackgroundWorkerTerminated,
349 #[error(transparent)]
351 Client(sp_blockchain::Error),
352 #[error(transparent)]
354 RuntimeApi(sp_api::ApiError),
355 #[error(transparent)]
357 ForkTree(Box<fork_tree::Error<sp_blockchain::Error>>),
358}
359
360impl<B: BlockT> From<Error<B>> for String {
361 fn from(error: Error<B>) -> String {
362 error.to_string()
363 }
364}
365
366fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
367 debug!(target: LOG_TARGET, "{}", error);
368 error
369}
370
371pub struct BabeIntermediate<B: BlockT> {
373 pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
375}
376
377pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
379
380pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
382where
383 C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
384 C::Api: BabeApi<B>,
385{
386 let at_hash = if client.usage_info().chain.finalized_state.is_some() {
387 client.usage_info().chain.best_hash
388 } else {
389 debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
390 client.usage_info().chain.genesis_hash
391 };
392
393 let runtime_api = client.runtime_api();
394 let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
395
396 let config = match version {
397 Some(1) => {
398 #[allow(deprecated)]
399 {
400 runtime_api.configuration_before_version_2(at_hash)?.into()
401 }
402 },
403 Some(2) => runtime_api.configuration(at_hash)?,
404 _ =>
405 return Err(sp_blockchain::Error::VersionInvalid(
406 "Unsupported or invalid BabeApi version".to_string(),
407 )),
408 };
409 Ok(config)
410}
411
412pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
414 pub keystore: KeystorePtr,
416
417 pub client: Arc<C>,
419
420 pub select_chain: SC,
422
423 pub env: E,
425
426 pub block_import: I,
430
431 pub sync_oracle: SO,
433
434 pub justification_sync_link: L,
436
437 pub create_inherent_data_providers: CIDP,
439
440 pub force_authoring: bool,
442
443 pub backoff_authoring_blocks: Option<BS>,
445
446 pub babe_link: BabeLink<B>,
448
449 pub block_proposal_slot_portion: SlotProportion,
455
456 pub max_block_proposal_slot_portion: Option<SlotProportion>,
459
460 pub telemetry: Option<TelemetryHandle>,
462}
463
464pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
466 BabeParams {
467 keystore,
468 client,
469 select_chain,
470 env,
471 block_import,
472 sync_oracle,
473 justification_sync_link,
474 create_inherent_data_providers,
475 force_authoring,
476 backoff_authoring_blocks,
477 babe_link,
478 block_proposal_slot_portion,
479 max_block_proposal_slot_portion,
480 telemetry,
481 }: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
482) -> Result<BabeWorker<B>, ConsensusError>
483where
484 B: BlockT,
485 C: ProvideRuntimeApi<B>
486 + HeaderBackend<B>
487 + HeaderMetadata<B, Error = ClientError>
488 + Send
489 + Sync
490 + 'static,
491 C::Api: BabeApi<B>,
492 SC: SelectChain<B> + 'static,
493 E: Environment<B, Error = Error> + Send + Sync + 'static,
494 E::Proposer: Proposer<B, Error = Error>,
495 I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
496 SO: SyncOracle + Send + Sync + Clone + 'static,
497 L: sc_consensus::JustificationSyncLink<B> + 'static,
498 CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
499 CIDP::InherentDataProviders: InherentDataProviderExt + Send,
500 BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
501 Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
502{
503 let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
504
505 let worker = BabeSlotWorker {
506 client: client.clone(),
507 block_import,
508 env,
509 sync_oracle: sync_oracle.clone(),
510 justification_sync_link,
511 force_authoring,
512 backoff_authoring_blocks,
513 keystore,
514 epoch_changes: babe_link.epoch_changes.clone(),
515 slot_notification_sinks: slot_notification_sinks.clone(),
516 config: babe_link.config.clone(),
517 block_proposal_slot_portion,
518 max_block_proposal_slot_portion,
519 telemetry,
520 };
521
522 info!(target: LOG_TARGET, "👶 Starting BABE Authorship worker");
523
524 let slot_worker = sc_consensus_slots::start_slot_worker(
525 babe_link.config.slot_duration(),
526 select_chain,
527 sc_consensus_slots::SimpleSlotWorkerToSlotWorker(worker),
528 sync_oracle,
529 create_inherent_data_providers,
530 );
531
532 Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
533}
534
535fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
539 client: &C,
540 notification: &FinalityNotification<Block>,
541) -> AuxDataOperations {
542 let mut hashes = HashSet::new();
543
544 let first = notification.tree_route.first().unwrap_or(¬ification.hash);
545 match client.header_metadata(*first) {
546 Ok(meta) => {
547 hashes.insert(meta.parent);
548 },
549 Err(err) => {
550 warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
551 },
552 }
553
554 hashes.extend(
556 notification
557 .tree_route
558 .iter()
559 .filter(|h| **h != notification.hash),
562 );
563
564 hashes.extend(notification.stale_blocks.iter().map(|b| b.hash));
565
566 hashes
567 .into_iter()
568 .map(|val| (aux_schema::block_weight_key(val), None))
569 .collect()
570}
571
572async fn answer_requests<B: BlockT, C>(
573 mut request_rx: Receiver<BabeRequest<B>>,
574 config: BabeConfiguration,
575 client: Arc<C>,
576 epoch_changes: SharedEpochChanges<B, Epoch>,
577) where
578 C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
579{
580 while let Some(request) = request_rx.next().await {
581 match request {
582 BabeRequest::EpochData(response) => {
583 let _ = response.send(epoch_changes.shared_data().clone());
584 },
585 BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
586 let lookup = || {
587 let epoch_changes = epoch_changes.shared_data();
588 epoch_changes
589 .epoch_data_for_child_of(
590 descendent_query(&*client),
591 &parent_hash,
592 parent_number,
593 slot,
594 |slot| Epoch::genesis(&config, slot),
595 )
596 .map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
597 .ok_or(Error::<B>::FetchEpoch(parent_hash))
598 };
599
600 let _ = response.send(lookup());
601 },
602 }
603 }
604}
605
606enum BabeRequest<B: BlockT> {
608 EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
610 EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
614}
615
616#[derive(Clone)]
618pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
619
620impl<B: BlockT> BabeWorkerHandle<B> {
621 async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
622 match self.0.clone().send(request).await {
623 Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
624 Err(err) => warn!(
625 target: LOG_TARGET,
626 "Unhandled error when sending request to worker: {:?}", err
627 ),
628 _ => {},
629 }
630
631 Ok(())
632 }
633
634 pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
636 let (tx, rx) = oneshot::channel();
637 self.send_request(BabeRequest::EpochData(tx)).await?;
638
639 rx.await.or(Err(Error::BackgroundWorkerTerminated))
640 }
641
642 pub async fn epoch_data_for_child_of(
646 &self,
647 parent_hash: B::Hash,
648 parent_number: NumberFor<B>,
649 slot: Slot,
650 ) -> Result<Epoch, Error<B>> {
651 let (tx, rx) = oneshot::channel();
652 self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
653 .await?;
654
655 rx.await.or(Err(Error::BackgroundWorkerTerminated))?
656 }
657}
658
659#[must_use]
661pub struct BabeWorker<B: BlockT> {
662 inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
663 slot_notification_sinks: SlotNotificationSinks<B>,
664}
665
666impl<B: BlockT> BabeWorker<B> {
667 pub fn slot_notification_stream(
670 &self,
671 ) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
672 const CHANNEL_BUFFER_SIZE: usize = 1024;
673
674 let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
675 self.slot_notification_sinks.lock().push(sink);
676 stream
677 }
678}
679
680impl<B: BlockT> Future for BabeWorker<B> {
681 type Output = ();
682
683 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
684 self.inner.as_mut().poll(cx)
685 }
686}
687
688type SlotNotificationSinks<B> = Arc<
690 Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
691>;
692
693struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
694 client: Arc<C>,
695 block_import: I,
696 env: E,
697 sync_oracle: SO,
698 justification_sync_link: L,
699 force_authoring: bool,
700 backoff_authoring_blocks: Option<BS>,
701 keystore: KeystorePtr,
702 epoch_changes: SharedEpochChanges<B, Epoch>,
703 slot_notification_sinks: SlotNotificationSinks<B>,
704 config: BabeConfiguration,
705 block_proposal_slot_portion: SlotProportion,
706 max_block_proposal_slot_portion: Option<SlotProportion>,
707 telemetry: Option<TelemetryHandle>,
708}
709
710#[async_trait::async_trait]
711impl<B, C, E, I, Error, SO, L, BS> sc_consensus_slots::SimpleSlotWorker<B>
712 for BabeSlotWorker<B, C, E, I, SO, L, BS>
713where
714 B: BlockT,
715 C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
716 C::Api: BabeApi<B>,
717 E: Environment<B, Error = Error> + Send + Sync,
718 E::Proposer: Proposer<B, Error = Error>,
719 I: BlockImport<B> + Send + Sync + 'static,
720 SO: SyncOracle + Send + Clone + Sync,
721 L: sc_consensus::JustificationSyncLink<B>,
722 BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
723 Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
724{
725 type Claim = (PreDigest, AuthorityId);
726 type SyncOracle = SO;
727 type JustificationSyncLink = L;
728 type CreateProposer =
729 Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
730 type Proposer = E::Proposer;
731 type BlockImport = I;
732 type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
733
734 fn logging_target(&self) -> &'static str {
735 LOG_TARGET
736 }
737
738 fn block_import(&mut self) -> &mut Self::BlockImport {
739 &mut self.block_import
740 }
741
742 fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
743 self.epoch_changes
744 .shared_data()
745 .epoch_descriptor_for_child_of(
746 descendent_query(&*self.client),
747 &parent.hash(),
748 *parent.number(),
749 slot,
750 )
751 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
752 .ok_or(ConsensusError::InvalidAuthoritiesSet)
753 }
754
755 fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
756 self.epoch_changes
757 .shared_data()
758 .viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
759 .map(|epoch| epoch.as_ref().authorities.len())
760 }
761
762 async fn claim_slot(
763 &mut self,
764 _parent_header: &B::Header,
765 slot: Slot,
766 epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
767 ) -> Option<Self::Claim> {
768 debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
769 let s = authorship::claim_slot(
770 slot,
771 self.epoch_changes
772 .shared_data()
773 .viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
774 .as_ref(),
775 &self.keystore,
776 );
777
778 if s.is_some() {
779 debug!(target: LOG_TARGET, "Claimed slot {}", slot);
780 }
781
782 s
783 }
784
785 fn notify_slot(
786 &self,
787 _parent_header: &B::Header,
788 slot: Slot,
789 epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
790 ) {
791 let sinks = &mut self.slot_notification_sinks.lock();
792 sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
793 Ok(()) => true,
794 Err(e) =>
795 if e.is_full() {
796 warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
797 true
798 } else {
799 false
800 },
801 });
802 }
803
804 fn pre_digest_data(&self, _slot: Slot, claim: &Self::Claim) -> Vec<sp_runtime::DigestItem> {
805 vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
806 }
807
808 async fn block_import_params(
809 &self,
810 header: B::Header,
811 header_hash: &B::Hash,
812 body: Vec<B::Extrinsic>,
813 storage_changes: StorageChanges<B>,
814 (_, public): Self::Claim,
815 epoch_descriptor: Self::AuxData,
816 ) -> Result<BlockImportParams<B>, ConsensusError> {
817 let signature = self
818 .keystore
819 .sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
820 .map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
821 .ok_or_else(|| {
822 ConsensusError::CannotSign(format!(
823 "Could not find key in keystore. Key: {:?}",
824 public
825 ))
826 })?;
827
828 let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
829
830 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
831 import_block.post_digests.push(digest_item);
832 import_block.body = Some(body);
833 import_block.state_action =
834 StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
835 import_block
836 .insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
837
838 Ok(import_block)
839 }
840
841 fn force_authoring(&self) -> bool {
842 self.force_authoring
843 }
844
845 fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
846 if let Some(ref strategy) = self.backoff_authoring_blocks {
847 if let Ok(chain_head_slot) =
848 find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
849 {
850 return strategy.should_backoff(
851 *chain_head.number(),
852 chain_head_slot,
853 self.client.info().finalized_number,
854 slot,
855 self.logging_target(),
856 )
857 }
858 }
859 false
860 }
861
862 fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
863 &mut self.sync_oracle
864 }
865
866 fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
867 &mut self.justification_sync_link
868 }
869
870 fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
871 Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
872 }
873
874 fn telemetry(&self) -> Option<TelemetryHandle> {
875 self.telemetry.clone()
876 }
877
878 fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
879 let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
880
881 sc_consensus_slots::proposing_remaining_duration(
882 parent_slot,
883 slot_info,
884 &self.block_proposal_slot_portion,
885 self.max_block_proposal_slot_portion.as_ref(),
886 sc_consensus_slots::SlotLenienceType::Exponential,
887 self.logging_target(),
888 )
889 }
890}
891
892pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
895 if header.number().is_zero() {
898 return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
899 slot: 0.into(),
900 authority_index: 0,
901 }))
902 }
903
904 let mut pre_digest: Option<_> = None;
905 for log in header.digest().logs() {
906 trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
907 match (log.as_babe_pre_digest(), pre_digest.is_some()) {
908 (Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
909 (None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
910 (s, false) => pre_digest = s,
911 }
912 }
913 pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
914}
915
916pub fn contains_epoch_change<B: BlockT>(header: &B::Header) -> bool {
918 find_next_epoch_digest::<B>(header).ok().flatten().is_some()
919}
920
921pub fn find_next_epoch_digest<B: BlockT>(
923 header: &B::Header,
924) -> Result<Option<NextEpochDescriptor>, Error<B>> {
925 let mut epoch_digest: Option<_> = None;
926 for log in header.digest().logs() {
927 trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
928 let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
929 match (log, epoch_digest.is_some()) {
930 (Some(ConsensusLog::NextEpochData(_)), true) =>
931 return Err(babe_err(Error::MultipleEpochChangeDigests)),
932 (Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
933 _ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
934 }
935 }
936
937 Ok(epoch_digest)
938}
939
940fn find_next_config_digest<B: BlockT>(
942 header: &B::Header,
943) -> Result<Option<NextConfigDescriptor>, Error<B>> {
944 let mut config_digest: Option<_> = None;
945 for log in header.digest().logs() {
946 trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
947 let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
948 match (log, config_digest.is_some()) {
949 (Some(ConsensusLog::NextConfigData(_)), true) =>
950 return Err(babe_err(Error::MultipleConfigChangeDigests)),
951 (Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
952 _ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
953 }
954 }
955
956 Ok(config_digest)
957}
958
959#[derive(Clone)]
961pub struct BabeLink<Block: BlockT> {
962 epoch_changes: SharedEpochChanges<Block, Epoch>,
963 config: BabeConfiguration,
964}
965
966impl<Block: BlockT> BabeLink<Block> {
967 pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
969 &self.epoch_changes
970 }
971
972 pub fn config(&self) -> &BabeConfiguration {
974 &self.config
975 }
976}
977
978pub struct BabeVerifier<Block: BlockT, Client> {
980 client: Arc<Client>,
981 slot_duration: SlotDuration,
982 config: BabeConfiguration,
983 epoch_changes: SharedEpochChanges<Block, Epoch>,
984 telemetry: Option<TelemetryHandle>,
985}
986
987#[async_trait::async_trait]
988impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client>
989where
990 Block: BlockT,
991 Client: HeaderMetadata<Block, Error = sp_blockchain::Error>
992 + HeaderBackend<Block>
993 + ProvideRuntimeApi<Block>
994 + Send
995 + Sync
996 + AuxStore,
997 Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
998{
999 async fn verify(
1000 &self,
1001 mut block: BlockImportParams<Block>,
1002 ) -> Result<BlockImportParams<Block>, String> {
1003 trace!(
1004 target: LOG_TARGET,
1005 "Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
1006 block.origin,
1007 block.header,
1008 block.justifications,
1009 block.body,
1010 );
1011
1012 let hash = block.header.hash();
1013 let parent_hash = *block.header.parent_hash();
1014
1015 let number = block.header.number();
1016
1017 if is_state_sync_or_gap_sync_import(&*self.client, &block) {
1018 return Ok(block)
1019 }
1020
1021 debug!(
1022 target: LOG_TARGET,
1023 "We have {:?} logs in this header",
1024 block.header.digest().logs().len()
1025 );
1026
1027 let slot_now = Slot::from_timestamp(Timestamp::current(), self.slot_duration);
1028
1029 let pre_digest = find_pre_digest::<Block>(&block.header)?;
1030 let (check_header, epoch_descriptor) = {
1031 let (epoch_descriptor, viable_epoch) = query_epoch_changes(
1032 &self.epoch_changes,
1033 self.client.as_ref(),
1034 &self.config,
1035 *number,
1036 pre_digest.slot(),
1037 parent_hash,
1038 )?;
1039
1040 let v_params = verification::VerificationParams {
1043 header: block.header.clone(),
1044 pre_digest: Some(pre_digest),
1045 slot_now: slot_now + 1,
1046 epoch: viable_epoch.as_ref(),
1047 };
1048
1049 (verification::check_header::<Block>(v_params)?, epoch_descriptor)
1050 };
1051
1052 match check_header {
1053 CheckedHeader::Checked(pre_header, verified_info) => {
1054 trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
1055 telemetry!(
1056 self.telemetry;
1057 CONSENSUS_TRACE;
1058 "babe.checked_and_importing";
1059 "pre_header" => ?pre_header,
1060 );
1061
1062 block.header = pre_header;
1063 block.post_digests.push(verified_info.seal);
1064 block.insert_intermediate(
1065 INTERMEDIATE_KEY,
1066 BabeIntermediate::<Block> { epoch_descriptor },
1067 );
1068 block.post_hash = Some(hash);
1069
1070 Ok(block)
1071 },
1072 CheckedHeader::Deferred(a, b) => {
1073 debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
1074 telemetry!(
1075 self.telemetry;
1076 CONSENSUS_DEBUG;
1077 "babe.header_too_far_in_future";
1078 "hash" => ?hash, "a" => ?a, "b" => ?b
1079 );
1080 Err(Error::<Block>::TooFarInFuture(hash).into())
1081 },
1082 }
1083 }
1084}
1085
1086fn is_state_sync_or_gap_sync_import<B: BlockT>(
1092 client: &impl HeaderBackend<B>,
1093 block: &BlockImportParams<B>,
1094) -> bool {
1095 let number = *block.header.number();
1096 let info = client.info();
1097 info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
1098 block.with_state()
1099}
1100
1101pub struct BabeBlockImport<Block: BlockT, Client, I, CIDP, SC> {
1110 inner: I,
1111 client: Arc<Client>,
1112 epoch_changes: SharedEpochChanges<Block, Epoch>,
1113 create_inherent_data_providers: CIDP,
1114 config: BabeConfiguration,
1115 select_chain: SC,
1120 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1124}
1125
1126impl<Block: BlockT, I: Clone, Client, CIDP: Clone, SC: Clone> Clone
1127 for BabeBlockImport<Block, Client, I, CIDP, SC>
1128{
1129 fn clone(&self) -> Self {
1130 BabeBlockImport {
1131 inner: self.inner.clone(),
1132 client: self.client.clone(),
1133 epoch_changes: self.epoch_changes.clone(),
1134 config: self.config.clone(),
1135 create_inherent_data_providers: self.create_inherent_data_providers.clone(),
1136 select_chain: self.select_chain.clone(),
1137 offchain_tx_pool_factory: self.offchain_tx_pool_factory.clone(),
1138 }
1139 }
1140}
1141
1142impl<Block: BlockT, Client, I, CIDP, SC> BabeBlockImport<Block, Client, I, CIDP, SC> {
1143 fn new(
1144 client: Arc<Client>,
1145 epoch_changes: SharedEpochChanges<Block, Epoch>,
1146 block_import: I,
1147 config: BabeConfiguration,
1148 create_inherent_data_providers: CIDP,
1149 select_chain: SC,
1150 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1151 ) -> Self {
1152 BabeBlockImport {
1153 client,
1154 inner: block_import,
1155 epoch_changes,
1156 config,
1157 create_inherent_data_providers,
1158 select_chain,
1159 offchain_tx_pool_factory,
1160 }
1161 }
1162}
1163
1164impl<Block, Client, Inner, CIDP, SC> BabeBlockImport<Block, Client, Inner, CIDP, SC>
1165where
1166 Block: BlockT,
1167 Inner: BlockImport<Block> + Send + Sync,
1168 Inner::Error: Into<ConsensusError>,
1169 Client: HeaderBackend<Block>
1170 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1171 + AuxStore
1172 + ProvideRuntimeApi<Block>
1173 + Send
1174 + Sync,
1175 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1176 CIDP: CreateInherentDataProviders<Block, ()>,
1177 CIDP::InherentDataProviders: InherentDataProviderExt + Send,
1178 SC: sp_consensus::SelectChain<Block> + 'static,
1179{
1180 async fn import_state(
1184 &self,
1185 mut block: BlockImportParams<Block>,
1186 ) -> Result<ImportResult, ConsensusError> {
1187 let hash = block.post_hash();
1188 let parent_hash = *block.header.parent_hash();
1189 let number = *block.header.number();
1190
1191 block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
1192 aux_schema::write_block_weight(hash, 0, |values| {
1194 block
1195 .auxiliary
1196 .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1197 });
1198
1199 let import_result = self.inner.import_block(block).await;
1201 let aux = match import_result {
1202 Ok(ImportResult::Imported(aux)) => aux,
1203 Ok(r) =>
1204 return Err(ConsensusError::ClientImport(format!(
1205 "Unexpected import result: {:?}",
1206 r
1207 ))),
1208 Err(r) => return Err(r.into()),
1209 };
1210
1211 let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
1213 ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1214 })?;
1215 let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
1216 ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1217 })?;
1218
1219 let mut epoch_changes = self.epoch_changes.shared_data_locked();
1220 epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
1221 aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1222 self.client.insert_aux(insert, [])
1223 })
1224 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1225
1226 Ok(ImportResult::Imported(aux))
1227 }
1228
1229 async fn check_inherents_and_equivocations(
1231 &self,
1232 block: &mut BlockImportParams<Block>,
1233 ) -> Result<(), ConsensusError> {
1234 if is_state_sync_or_gap_sync_import(&*self.client, block) {
1235 return Ok(())
1236 }
1237
1238 let parent_hash = *block.header.parent_hash();
1239 let number = *block.header.number();
1240
1241 let create_inherent_data_providers = self
1242 .create_inherent_data_providers
1243 .create_inherent_data_providers(parent_hash, ())
1244 .await?;
1245
1246 let slot_now = create_inherent_data_providers.slot();
1247
1248 let babe_pre_digest = find_pre_digest::<Block>(&block.header)
1249 .map_err(|e| ConsensusError::Other(Box::new(e)))?;
1250 let slot = babe_pre_digest.slot();
1251
1252 self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
1254 .await?;
1255
1256 let author = {
1258 let viable_epoch = query_epoch_changes(
1259 &self.epoch_changes,
1260 self.client.as_ref(),
1261 &self.config,
1262 number,
1263 slot,
1264 parent_hash,
1265 )
1266 .map_err(|e| ConsensusError::Other(babe_err(e).into()))?
1267 .1;
1268 match viable_epoch
1269 .as_ref()
1270 .authorities
1271 .get(babe_pre_digest.authority_index() as usize)
1272 {
1273 Some(author) => author.0.clone(),
1274 None =>
1275 return Err(ConsensusError::Other(Error::<Block>::SlotAuthorNotFound.into())),
1276 }
1277 };
1278 if let Err(err) = self
1279 .check_and_report_equivocation(slot_now, slot, &block.header, &author, &block.origin)
1280 .await
1281 {
1282 warn!(
1283 target: LOG_TARGET,
1284 "Error checking/reporting BABE equivocation: {}", err
1285 );
1286 }
1287 Ok(())
1288 }
1289
1290 async fn check_inherents(
1291 &self,
1292 block: &mut BlockImportParams<Block>,
1293 at_hash: Block::Hash,
1294 slot: Slot,
1295 create_inherent_data_providers: CIDP::InherentDataProviders,
1296 ) -> Result<(), ConsensusError> {
1297 if block.state_action.skip_execution_checks() {
1298 return Ok(())
1299 }
1300
1301 if let Some(inner_body) = block.body.take() {
1302 let new_block = Block::new(block.header.clone(), inner_body);
1303 let mut inherent_data = create_inherent_data_providers
1307 .create_inherent_data()
1308 .await
1309 .map_err(|e| ConsensusError::Other(Box::new(e)))?;
1310 inherent_data.babe_replace_inherent_data(slot);
1311
1312 use sp_block_builder::CheckInherentsError;
1313
1314 sp_block_builder::check_inherents_with_data(
1315 self.client.clone(),
1316 at_hash,
1317 new_block.clone(),
1318 &create_inherent_data_providers,
1319 inherent_data,
1320 )
1321 .await
1322 .map_err(|e| {
1323 ConsensusError::Other(Box::new(match e {
1324 CheckInherentsError::CreateInherentData(e) =>
1325 Error::<Block>::CreateInherents(e),
1326 CheckInherentsError::Client(e) => Error::RuntimeApi(e),
1327 CheckInherentsError::CheckInherents(e) => Error::CheckInherents(e),
1328 CheckInherentsError::CheckInherentsUnknownError(id) =>
1329 Error::CheckInherentsUnhandled(id),
1330 }))
1331 })?;
1332 let (_, inner_body) = new_block.deconstruct();
1333 block.body = Some(inner_body);
1334 }
1335
1336 Ok(())
1337 }
1338
1339 async fn check_and_report_equivocation(
1340 &self,
1341 slot_now: Slot,
1342 slot: Slot,
1343 header: &Block::Header,
1344 author: &AuthorityId,
1345 origin: &BlockOrigin,
1346 ) -> Result<(), Error<Block>> {
1347 if *origin == BlockOrigin::NetworkInitialSync {
1350 return Ok(())
1351 }
1352
1353 let Some(equivocation_proof) =
1355 check_equivocation(&*self.client, slot_now, slot, header, author)
1356 .map_err(Error::Client)?
1357 else {
1358 return Ok(())
1359 };
1360
1361 info!(
1362 "Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
1363 author,
1364 slot,
1365 equivocation_proof.first_header.hash(),
1366 equivocation_proof.second_header.hash(),
1367 );
1368
1369 let best_hash = self
1371 .select_chain
1372 .best_chain()
1373 .await
1374 .map(|h| h.hash())
1375 .map_err(|e| Error::Client(e.into()))?;
1376
1377 let generate_key_owner_proof = |at_hash: Block::Hash| {
1386 self.client
1387 .runtime_api()
1388 .generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
1389 .map_err(Error::RuntimeApi)
1390 };
1391
1392 let parent_hash = *header.parent_hash();
1393 let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
1394 Some(proof) => proof,
1395 None => match generate_key_owner_proof(best_hash)? {
1396 Some(proof) => proof,
1397 None => {
1398 debug!(
1399 target: LOG_TARGET,
1400 "Equivocation offender is not part of the authority set."
1401 );
1402 return Ok(())
1403 },
1404 },
1405 };
1406
1407 let mut runtime_api = self.client.runtime_api();
1409
1410 runtime_api
1412 .register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
1413
1414 runtime_api
1415 .submit_report_equivocation_unsigned_extrinsic(
1416 best_hash,
1417 equivocation_proof,
1418 key_owner_proof,
1419 )
1420 .map_err(Error::RuntimeApi)?;
1421
1422 info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
1423
1424 Ok(())
1425 }
1426}
1427
1428#[async_trait::async_trait]
1429impl<Block, Client, Inner, CIDP, SC> BlockImport<Block>
1430 for BabeBlockImport<Block, Client, Inner, CIDP, SC>
1431where
1432 Block: BlockT,
1433 Inner: BlockImport<Block> + Send + Sync,
1434 Inner::Error: Into<ConsensusError>,
1435 Client: HeaderBackend<Block>
1436 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1437 + AuxStore
1438 + ProvideRuntimeApi<Block>
1439 + Send
1440 + Sync,
1441 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1442 CIDP: CreateInherentDataProviders<Block, ()>,
1443 CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
1444 SC: SelectChain<Block> + 'static,
1445{
1446 type Error = ConsensusError;
1447
1448 async fn import_block(
1449 &self,
1450 mut block: BlockImportParams<Block>,
1451 ) -> Result<ImportResult, Self::Error> {
1452 let hash = block.post_hash();
1453 let number = *block.header.number();
1454 let info = self.client.info();
1455
1456 self.check_inherents_and_equivocations(&mut block).await?;
1457
1458 let block_status = self
1459 .client
1460 .status(hash)
1461 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1462
1463 if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end) ||
1467 block_status == BlockStatus::InChain
1468 {
1469 let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
1472 block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1473 return self.inner.import_block(block).await.map_err(Into::into)
1474 }
1475
1476 if block.with_state() {
1477 return self.import_state(block).await
1478 }
1479
1480 let pre_digest = find_pre_digest::<Block>(&block.header).expect(
1481 "valid babe headers must contain a predigest; header has been already verified; qed",
1482 );
1483 let slot = pre_digest.slot();
1484
1485 let parent_hash = *block.header.parent_hash();
1486 let parent_header = self
1487 .client
1488 .header(parent_hash)
1489 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1490 .ok_or_else(|| {
1491 ConsensusError::ChainLookup(
1492 babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
1493 )
1494 })?;
1495
1496 let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
1497 "parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
1498 been verified; qed",
1499 );
1500
1501 if slot <= parent_slot {
1503 return Err(ConsensusError::ClientImport(
1504 babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
1505 ))
1506 }
1507
1508 let mut old_epoch_changes = None;
1511
1512 let mut epoch_changes = {
1515 let mut epoch_changes = self.epoch_changes.shared_data_locked();
1516
1517 let (epoch_descriptor, first_in_epoch, parent_weight) = {
1523 let parent_weight = if *parent_header.number() == Zero::zero() {
1524 0
1525 } else {
1526 aux_schema::load_block_weight(&*self.client, parent_hash)
1527 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1528 .ok_or_else(|| {
1529 ConsensusError::ClientImport(
1530 babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
1531 .into(),
1532 )
1533 })?
1534 };
1535
1536 let intermediate =
1537 block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
1538
1539 let epoch_descriptor = intermediate.epoch_descriptor;
1540 let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
1541 (epoch_descriptor, first_in_epoch, parent_weight)
1542 };
1543
1544 let total_weight = parent_weight + pre_digest.added_weight();
1545
1546 let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
1548 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1549 let next_config_digest = find_next_config_digest::<Block>(&block.header)
1550 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1551
1552 match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
1553 (true, true, _) => {},
1554 (false, false, false) => {},
1555 (false, false, true) =>
1556 return Err(ConsensusError::ClientImport(
1557 babe_err(Error::<Block>::UnexpectedConfigChange).into(),
1558 )),
1559 (true, false, _) =>
1560 return Err(ConsensusError::ClientImport(
1561 babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
1562 )),
1563 (false, true, _) =>
1564 return Err(ConsensusError::ClientImport(
1565 babe_err(Error::<Block>::UnexpectedEpochChange).into(),
1566 )),
1567 }
1568
1569 if let Some(next_epoch_descriptor) = next_epoch_digest {
1570 old_epoch_changes = Some((*epoch_changes).clone());
1571
1572 let mut viable_epoch = epoch_changes
1573 .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
1574 .ok_or_else(|| {
1575 ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
1576 })?
1577 .into_cloned();
1578
1579 let epoch_config = next_config_digest
1580 .map(Into::into)
1581 .unwrap_or_else(|| viable_epoch.as_ref().config.clone());
1582
1583 let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
1585 log::Level::Debug
1586 } else {
1587 log::Level::Info
1588 };
1589
1590 if viable_epoch.as_ref().end_slot() <= slot {
1591 let epoch = viable_epoch.as_mut();
1605 let prev_index = epoch.epoch_index;
1606 *epoch = epoch.clone_for_slot(slot);
1607
1608 warn!(
1609 target: LOG_TARGET,
1610 "👶 Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
1611 );
1612 }
1613
1614 log!(
1615 target: LOG_TARGET,
1616 log_level,
1617 "👶 New epoch {} launching at block {} (block slot {} >= start slot {}).",
1618 viable_epoch.as_ref().epoch_index,
1619 hash,
1620 slot,
1621 viable_epoch.as_ref().start_slot,
1622 );
1623
1624 let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
1625
1626 log!(
1627 target: LOG_TARGET,
1628 log_level,
1629 "👶 Next epoch starts at slot {}",
1630 next_epoch.as_ref().start_slot,
1631 );
1632
1633 let prune_and_import = || {
1641 prune_finalized(self.client.clone(), &mut epoch_changes)?;
1642
1643 epoch_changes
1644 .import(
1645 descendent_query(&*self.client),
1646 hash,
1647 number,
1648 *block.header.parent_hash(),
1649 next_epoch,
1650 )
1651 .map_err(|e| {
1652 ConsensusError::ClientImport(format!(
1653 "Error importing epoch changes: {}",
1654 e
1655 ))
1656 })?;
1657 Ok(())
1658 };
1659
1660 if let Err(e) = prune_and_import() {
1661 debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
1662 *epoch_changes =
1663 old_epoch_changes.expect("set `Some` above and not taken; qed");
1664 return Err(e)
1665 }
1666
1667 crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1668 block
1669 .auxiliary
1670 .extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1671 });
1672 }
1673
1674 aux_schema::write_block_weight(hash, total_weight, |values| {
1675 block
1676 .auxiliary
1677 .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1678 });
1679
1680 block.fork_choice = {
1684 let (last_best, last_best_number) = (info.best_hash, info.best_number);
1685
1686 let last_best_weight = if &last_best == block.header.parent_hash() {
1687 parent_weight
1690 } else {
1691 aux_schema::load_block_weight(&*self.client, last_best)
1692 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1693 .ok_or_else(|| {
1694 ConsensusError::ChainLookup(
1695 "No block weight for parent header.".to_string(),
1696 )
1697 })?
1698 };
1699
1700 Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
1701 true
1702 } else if total_weight == last_best_weight {
1703 number > last_best_number
1704 } else {
1705 false
1706 }))
1707 };
1708
1709 epoch_changes.release_mutex()
1711 };
1712
1713 let import_result = self.inner.import_block(block).await;
1714
1715 if import_result.is_err() {
1718 if let Some(old_epoch_changes) = old_epoch_changes {
1719 *epoch_changes.upgrade() = old_epoch_changes;
1720 }
1721 }
1722
1723 import_result.map_err(Into::into)
1724 }
1725
1726 async fn check_block(
1727 &self,
1728 block: BlockCheckParams<Block>,
1729 ) -> Result<ImportResult, Self::Error> {
1730 self.inner.check_block(block).await.map_err(Into::into)
1731 }
1732}
1733
1734fn prune_finalized<Block, Client>(
1736 client: Arc<Client>,
1737 epoch_changes: &mut EpochChangesFor<Block, Epoch>,
1738) -> Result<(), ConsensusError>
1739where
1740 Block: BlockT,
1741 Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
1742{
1743 let info = client.info();
1744
1745 let finalized_slot = {
1746 let finalized_header = client
1747 .header(info.finalized_hash)
1748 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1749 .expect(
1750 "best finalized hash was given by client; finalized headers must exist in db; qed",
1751 );
1752
1753 find_pre_digest::<Block>(&finalized_header)
1754 .expect("finalized header must be valid; valid blocks have a pre-digest; qed")
1755 .slot()
1756 };
1757
1758 epoch_changes
1759 .prune_finalized(
1760 descendent_query(&*client),
1761 &info.finalized_hash,
1762 info.finalized_number,
1763 finalized_slot,
1764 )
1765 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1766
1767 Ok(())
1768}
1769
1770pub fn block_import<Client, Block: BlockT, I, CIDP, SC>(
1776 config: BabeConfiguration,
1777 wrapped_block_import: I,
1778 client: Arc<Client>,
1779 create_inherent_data_providers: CIDP,
1780 select_chain: SC,
1781 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1782) -> ClientResult<(BabeBlockImport<Block, Client, I, CIDP, SC>, BabeLink<Block>)>
1783where
1784 Client: AuxStore
1785 + HeaderBackend<Block>
1786 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1787 + PreCommitActions<Block>
1788 + 'static,
1789{
1790 let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
1791 let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
1792
1793 prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
1797
1798 let client_weak = Arc::downgrade(&client);
1799 let on_finality = move |summary: &FinalityNotification<Block>| {
1800 if let Some(client) = client_weak.upgrade() {
1801 aux_storage_cleanup(client.as_ref(), summary)
1802 } else {
1803 Default::default()
1804 }
1805 };
1806 client.register_finality_action(Box::new(on_finality));
1807
1808 let import = BabeBlockImport::new(
1809 client,
1810 epoch_changes,
1811 wrapped_block_import,
1812 config,
1813 create_inherent_data_providers,
1814 select_chain,
1815 offchain_tx_pool_factory,
1816 );
1817
1818 Ok((import, link))
1819}
1820
1821pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, Spawn> {
1823 pub link: BabeLink<Block>,
1825 pub block_import: BI,
1827 pub justification_import: Option<BoxJustificationImport<Block>>,
1829 pub client: Arc<Client>,
1831 pub slot_duration: SlotDuration,
1833 pub spawner: &'a Spawn,
1835 pub registry: Option<&'a Registry>,
1837 pub telemetry: Option<TelemetryHandle>,
1839}
1840
1841pub fn import_queue<Block: BlockT, Client, BI, Spawn>(
1851 ImportQueueParams {
1852 link: babe_link,
1853 block_import,
1854 justification_import,
1855 client,
1856 slot_duration,
1857 spawner,
1858 registry,
1859 telemetry,
1860 }: ImportQueueParams<'_, Block, BI, Client, Spawn>,
1861) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
1862where
1863 BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1864 Client: ProvideRuntimeApi<Block>
1865 + HeaderBackend<Block>
1866 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1867 + AuxStore
1868 + Send
1869 + Sync
1870 + 'static,
1871 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1872 Spawn: SpawnEssentialNamed,
1873{
1874 const HANDLE_BUFFER_SIZE: usize = 1024;
1875
1876 let verifier = BabeVerifier {
1877 slot_duration,
1878 config: babe_link.config.clone(),
1879 epoch_changes: babe_link.epoch_changes.clone(),
1880 telemetry,
1881 client: client.clone(),
1882 };
1883
1884 let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
1885
1886 let answer_requests =
1887 answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
1888
1889 spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
1890
1891 Ok((
1892 BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
1893 BabeWorkerHandle(worker_tx),
1894 ))
1895}
1896
1897pub fn revert<Block, Client, Backend>(
1901 client: Arc<Client>,
1902 backend: Arc<Backend>,
1903 blocks: NumberFor<Block>,
1904) -> ClientResult<()>
1905where
1906 Block: BlockT,
1907 Client: AuxStore
1908 + HeaderMetadata<Block, Error = sp_blockchain::Error>
1909 + HeaderBackend<Block>
1910 + ProvideRuntimeApi<Block>
1911 + UsageProvider<Block>,
1912 Client::Api: BabeApi<Block>,
1913 Backend: BackendT<Block>,
1914{
1915 let best_number = client.info().best_number;
1916 let finalized = client.info().finalized_number;
1917
1918 let revertible = blocks.min(best_number - finalized);
1919 if revertible == Zero::zero() {
1920 return Ok(())
1921 }
1922
1923 let revert_up_to_number = best_number - revertible;
1924 let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
1925 format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
1926 ))?;
1927
1928 let config = configuration(&*client)?;
1932 let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
1933 let mut epoch_changes = epoch_changes.shared_data();
1934
1935 if revert_up_to_number == Zero::zero() {
1936 *epoch_changes = EpochChangesFor::<Block, Epoch>::default();
1938 } else {
1939 epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
1940 }
1941
1942 let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
1945
1946 let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
1947 sp_blockchain::tree_route(&*client, revert_up_to_hash, leaf)
1948 .map(|route| route.retracted().is_empty())
1949 .unwrap_or_default()
1950 });
1951
1952 for leaf in leaves {
1953 let mut hash = leaf;
1954 loop {
1955 let meta = client.header_metadata(hash)?;
1956 if meta.number <= revert_up_to_number ||
1957 !weight_keys.insert(aux_schema::block_weight_key(hash))
1958 {
1959 break
1961 }
1962 hash = meta.parent;
1963 }
1964 }
1965
1966 let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
1967
1968 aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
1970 client.insert_aux(values, weight_keys.iter())
1971 })
1972}
1973
1974fn query_epoch_changes<Block, Client>(
1975 epoch_changes: &SharedEpochChanges<Block, Epoch>,
1976 client: &Client,
1977 config: &BabeConfiguration,
1978 block_number: NumberFor<Block>,
1979 slot: Slot,
1980 parent_hash: Block::Hash,
1981) -> Result<
1982 (ViableEpochDescriptor<Block::Hash, NumberFor<Block>, Epoch>, ViableEpoch<Epoch>),
1983 Error<Block>,
1984>
1985where
1986 Block: BlockT,
1987 Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = sp_blockchain::Error>,
1988{
1989 let epoch_changes = epoch_changes.shared_data();
1990 let epoch_descriptor = epoch_changes
1991 .epoch_descriptor_for_child_of(
1992 descendent_query(client),
1993 &parent_hash,
1994 block_number - 1u32.into(),
1995 slot,
1996 )
1997 .map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
1998 .ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
1999 let viable_epoch = epoch_changes
2000 .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
2001 .ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2002 Ok((epoch_descriptor, viable_epoch.into_cloned()))
2003}