1#![forbid(unsafe_code)]
67#![warn(missing_docs)]
68
69use std::{
70 collections::HashSet,
71 future::Future,
72 ops::{Deref, DerefMut},
73 pin::Pin,
74 sync::Arc,
75 task::{Context, Poll},
76 time::Duration,
77};
78
79use codec::{Decode, Encode};
80use futures::{
81 channel::{
82 mpsc::{channel, Receiver, Sender},
83 oneshot,
84 },
85 prelude::*,
86};
87use log::{debug, info, log, trace, warn};
88use parking_lot::Mutex;
89use prometheus_endpoint::Registry;
90
91use pezsc_client_api::{
92 backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
93 PreCommitActions, UsageProvider,
94};
95use pezsc_consensus::{
96 block_import::{
97 BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult,
98 StateAction,
99 },
100 import_queue::{BasicQueue, BoxJustificationImport, DefaultImportQueue, Verifier},
101};
102use pezsc_consensus_epochs::{
103 descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpoch,
104 ViableEpochDescriptor,
105};
106use pezsc_consensus_slots::{
107 check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
108 SlotInfo, StorageChanges,
109};
110use pezsc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
111use pezsc_transaction_pool_api::OffchainTransactionPoolFactory;
112use pezsp_api::{ApiExt, ProvideRuntimeApi};
113use pezsp_application_crypto::AppCrypto;
114use pezsp_block_builder::BlockBuilder as BlockBuilderApi;
115use pezsp_blockchain::{
116 Backend as _, BlockStatus, Error as ClientError, HeaderBackend, HeaderMetadata,
117 Result as ClientResult,
118};
119use pezsp_consensus::{BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain};
120use pezsp_consensus_babe::{inherents::BabeInherentData, SlotDuration};
121use pezsp_consensus_slots::Slot;
122use pezsp_core::traits::SpawnEssentialNamed;
123use pezsp_inherents::{CreateInherentDataProviders, InherentDataProvider};
124use pezsp_keystore::KeystorePtr;
125use pezsp_runtime::{
126 generic::OpaqueDigestItemId,
127 traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
128 DigestItem,
129};
130
131pub use pezsc_consensus_slots::SlotProportion;
132pub use pezsp_consensus::SyncOracle;
133pub use pezsp_consensus_babe::{
134 digests::{
135 CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
136 PrimaryPreDigest, SecondaryPlainPreDigest,
137 },
138 AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
139 BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
140};
141
142pub use aux_schema::load_block_weight as block_weight;
143use pezsp_timestamp::Timestamp;
144
145mod migration;
146mod verification;
147
148pub mod authorship;
149pub mod aux_schema;
150#[cfg(test)]
151mod tests;
152
153const LOG_TARGET: &str = "babe";
154
155const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"bizinikiwi-babe-vrf";
157
158const AUTHORING_SCORE_LENGTH: usize = 16;
160
161#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
163pub struct Epoch(pezsp_consensus_babe::Epoch);
164
165impl Deref for Epoch {
166 type Target = pezsp_consensus_babe::Epoch;
167
168 fn deref(&self) -> &Self::Target {
169 &self.0
170 }
171}
172
173impl DerefMut for Epoch {
174 fn deref_mut(&mut self) -> &mut Self::Target {
175 &mut self.0
176 }
177}
178
179impl From<pezsp_consensus_babe::Epoch> for Epoch {
180 fn from(epoch: pezsp_consensus_babe::Epoch) -> Self {
181 Epoch(epoch)
182 }
183}
184
185impl EpochT for Epoch {
186 type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
187 type Slot = Slot;
188
189 fn increment(
190 &self,
191 (descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
192 ) -> Epoch {
193 pezsp_consensus_babe::Epoch {
194 epoch_index: self.epoch_index + 1,
195 start_slot: self.start_slot + self.duration,
196 duration: self.duration,
197 authorities: descriptor.authorities,
198 randomness: descriptor.randomness,
199 config,
200 }
201 .into()
202 }
203
204 fn start_slot(&self) -> Slot {
205 self.start_slot
206 }
207
208 fn end_slot(&self) -> Slot {
209 self.start_slot + self.duration
210 }
211}
212
213impl Epoch {
214 pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
218 pezsp_consensus_babe::Epoch {
219 epoch_index: 0,
220 start_slot: slot,
221 duration: genesis_config.epoch_length,
222 authorities: genesis_config.authorities.clone(),
223 randomness: genesis_config.randomness,
224 config: BabeEpochConfiguration {
225 c: genesis_config.c,
226 allowed_slots: genesis_config.allowed_slots,
227 },
228 }
229 .into()
230 }
231
232 pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
240 let mut epoch = self.clone();
241
242 let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
243
244 let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
245 "epoch number is u64; it should be strictly smaller than number of slots; \
246 slots relate in some way to wall clock time; \
247 if u64 is not enough we should crash for safety; qed.",
248 );
249
250 let start_slot = skipped_epochs
251 .checked_mul(epoch.duration)
252 .and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
253 .expect(
254 "slot number is u64; it should relate in some way to wall clock time; \
255 if u64 is not enough we should crash for safety; qed.",
256 );
257
258 epoch.epoch_index = epoch_index;
259 epoch.start_slot = Slot::from(start_slot);
260
261 epoch
262 }
263}
264
265#[derive(Debug, thiserror::Error)]
267pub enum Error<B: BlockT> {
268 #[error("Multiple BABE pre-runtime digests, rejecting!")]
270 MultiplePreRuntimeDigests,
271 #[error("No BABE pre-runtime digest found")]
273 NoPreRuntimeDigest,
274 #[error("Multiple BABE epoch change digests, rejecting!")]
276 MultipleEpochChangeDigests,
277 #[error("Multiple BABE config change digests, rejecting!")]
279 MultipleConfigChangeDigests,
280 #[error("Could not extract timestamp and slot: {0}")]
282 Extraction(ConsensusError),
283 #[error("Could not fetch epoch at {0:?}")]
285 FetchEpoch(B::Hash),
286 #[error("Header {0:?} rejected: too far in the future")]
288 TooFarInFuture(B::Hash),
289 #[error("Parent ({0}) of {1} unavailable. Cannot import")]
291 ParentUnavailable(B::Hash, B::Hash),
292 #[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
294 SlotMustIncrease(Slot, Slot),
295 #[error("Header {0:?} has a bad seal")]
297 HeaderBadSeal(B::Hash),
298 #[error("Header {0:?} is unsealed")]
300 HeaderUnsealed(B::Hash),
301 #[error("Slot author not found")]
303 SlotAuthorNotFound,
304 #[error("Secondary slot assignments are disabled for the current epoch.")]
306 SecondarySlotAssignmentsDisabled,
307 #[error("Bad signature on {0:?}")]
309 BadSignature(B::Hash),
310 #[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
312 InvalidAuthor(AuthorityId, AuthorityId),
313 #[error("No secondary author expected.")]
315 NoSecondaryAuthorExpected,
316 #[error("VRF verification failed")]
318 VrfVerificationFailed,
319 #[error("VRF output rejected, threshold {0} exceeded")]
321 VrfThresholdExceeded(u128),
322 #[error("Could not fetch parent header: {0}")]
324 FetchParentHeader(pezsp_blockchain::Error),
325 #[error("Expected epoch change to happen at {0:?}, s{1}")]
327 ExpectedEpochChange(B::Hash, Slot),
328 #[error("Unexpected config change")]
330 UnexpectedConfigChange,
331 #[error("Unexpected epoch change")]
333 UnexpectedEpochChange,
334 #[error("Parent block of {0} has no associated weight")]
336 ParentBlockNoAssociatedWeight(B::Hash),
337 #[error("Checking inherents failed: {0}")]
339 CheckInherents(pezsp_inherents::Error),
340 #[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
342 CheckInherentsUnhandled(pezsp_inherents::InherentIdentifier),
343 #[error("Creating inherents failed: {0}")]
345 CreateInherents(pezsp_inherents::Error),
346 #[error("Background worker is not running")]
348 BackgroundWorkerTerminated,
349 #[error(transparent)]
351 Client(pezsp_blockchain::Error),
352 #[error(transparent)]
354 RuntimeApi(pezsp_api::ApiError),
355 #[error(transparent)]
357 ForkTree(Box<pez_fork_tree::Error<pezsp_blockchain::Error>>),
358}
359
360impl<B: BlockT> From<Error<B>> for String {
361 fn from(error: Error<B>) -> String {
362 error.to_string()
363 }
364}
365
366fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
367 debug!(target: LOG_TARGET, "{}", error);
368 error
369}
370
371pub struct BabeIntermediate<B: BlockT> {
373 pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
375}
376
377pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
379
380pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
382where
383 C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
384 C::Api: BabeApi<B>,
385{
386 let at_hash = if client.usage_info().chain.finalized_state.is_some() {
387 client.usage_info().chain.best_hash
388 } else {
389 debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
390 client.usage_info().chain.genesis_hash
391 };
392
393 let runtime_api = client.runtime_api();
394 let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
395
396 let config = match version {
397 Some(1) => {
398 #[allow(deprecated)]
399 {
400 runtime_api.configuration_before_version_2(at_hash)?.into()
401 }
402 },
403 Some(2) => runtime_api.configuration(at_hash)?,
404 _ => {
405 return Err(pezsp_blockchain::Error::VersionInvalid(
406 "Unsupported or invalid BabeApi version".to_string(),
407 ))
408 },
409 };
410 Ok(config)
411}
412
413pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
415 pub keystore: KeystorePtr,
417
418 pub client: Arc<C>,
420
421 pub select_chain: SC,
423
424 pub env: E,
426
427 pub block_import: I,
431
432 pub sync_oracle: SO,
434
435 pub justification_sync_link: L,
437
438 pub create_inherent_data_providers: CIDP,
440
441 pub force_authoring: bool,
443
444 pub backoff_authoring_blocks: Option<BS>,
446
447 pub babe_link: BabeLink<B>,
449
450 pub block_proposal_slot_portion: SlotProportion,
456
457 pub max_block_proposal_slot_portion: Option<SlotProportion>,
460
461 pub telemetry: Option<TelemetryHandle>,
463}
464
465pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
467 BabeParams {
468 keystore,
469 client,
470 select_chain,
471 env,
472 block_import,
473 sync_oracle,
474 justification_sync_link,
475 create_inherent_data_providers,
476 force_authoring,
477 backoff_authoring_blocks,
478 babe_link,
479 block_proposal_slot_portion,
480 max_block_proposal_slot_portion,
481 telemetry,
482 }: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
483) -> Result<BabeWorker<B>, ConsensusError>
484where
485 B: BlockT,
486 C: ProvideRuntimeApi<B>
487 + HeaderBackend<B>
488 + HeaderMetadata<B, Error = ClientError>
489 + Send
490 + Sync
491 + 'static,
492 C::Api: BabeApi<B>,
493 SC: SelectChain<B> + 'static,
494 E: Environment<B, Error = Error> + Send + Sync + 'static,
495 E::Proposer: Proposer<B, Error = Error>,
496 I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
497 SO: SyncOracle + Send + Sync + Clone + 'static,
498 L: pezsc_consensus::JustificationSyncLink<B> + 'static,
499 CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
500 CIDP::InherentDataProviders: InherentDataProviderExt + Send,
501 BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
502 Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
503{
504 let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
505
506 let worker = BabeSlotWorker {
507 client: client.clone(),
508 block_import,
509 env,
510 sync_oracle: sync_oracle.clone(),
511 justification_sync_link,
512 force_authoring,
513 backoff_authoring_blocks,
514 keystore,
515 epoch_changes: babe_link.epoch_changes.clone(),
516 slot_notification_sinks: slot_notification_sinks.clone(),
517 config: babe_link.config.clone(),
518 block_proposal_slot_portion,
519 max_block_proposal_slot_portion,
520 telemetry,
521 };
522
523 info!(target: LOG_TARGET, "👶 Starting BABE Authorship worker");
524
525 let slot_worker = pezsc_consensus_slots::start_slot_worker(
526 babe_link.config.slot_duration(),
527 select_chain,
528 pezsc_consensus_slots::SimpleSlotWorkerToSlotWorker(worker),
529 sync_oracle,
530 create_inherent_data_providers,
531 );
532
533 Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
534}
535
536fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
540 client: &C,
541 notification: &FinalityNotification<Block>,
542) -> AuxDataOperations {
543 let mut hashes = HashSet::new();
544
545 let first = notification.tree_route.first().unwrap_or(¬ification.hash);
546 match client.header_metadata(*first) {
547 Ok(meta) => {
548 hashes.insert(meta.parent);
549 },
550 Err(err) => {
551 warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
552 },
553 }
554
555 hashes.extend(
557 notification
558 .tree_route
559 .iter()
560 .filter(|h| **h != notification.hash),
563 );
564
565 hashes.extend(notification.stale_blocks.iter().map(|b| b.hash));
566
567 hashes
568 .into_iter()
569 .map(|val| (aux_schema::block_weight_key(val), None))
570 .collect()
571}
572
573async fn answer_requests<B: BlockT, C>(
574 mut request_rx: Receiver<BabeRequest<B>>,
575 config: BabeConfiguration,
576 client: Arc<C>,
577 epoch_changes: SharedEpochChanges<B, Epoch>,
578) where
579 C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
580{
581 while let Some(request) = request_rx.next().await {
582 match request {
583 BabeRequest::EpochData(response) => {
584 let _ = response.send(epoch_changes.shared_data().clone());
585 },
586 BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
587 let lookup = || {
588 let epoch_changes = epoch_changes.shared_data();
589 epoch_changes
590 .epoch_data_for_child_of(
591 descendent_query(&*client),
592 &parent_hash,
593 parent_number,
594 slot,
595 |slot| Epoch::genesis(&config, slot),
596 )
597 .map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
598 .ok_or(Error::<B>::FetchEpoch(parent_hash))
599 };
600
601 let _ = response.send(lookup());
602 },
603 }
604 }
605}
606
607enum BabeRequest<B: BlockT> {
609 EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
611 EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
615}
616
617#[derive(Clone)]
619pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
620
621impl<B: BlockT> BabeWorkerHandle<B> {
622 async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
623 match self.0.clone().send(request).await {
624 Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
625 Err(err) => warn!(
626 target: LOG_TARGET,
627 "Unhandled error when sending request to worker: {:?}", err
628 ),
629 _ => {},
630 }
631
632 Ok(())
633 }
634
635 pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
637 let (tx, rx) = oneshot::channel();
638 self.send_request(BabeRequest::EpochData(tx)).await?;
639
640 rx.await.or(Err(Error::BackgroundWorkerTerminated))
641 }
642
643 pub async fn epoch_data_for_child_of(
647 &self,
648 parent_hash: B::Hash,
649 parent_number: NumberFor<B>,
650 slot: Slot,
651 ) -> Result<Epoch, Error<B>> {
652 let (tx, rx) = oneshot::channel();
653 self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
654 .await?;
655
656 rx.await.or(Err(Error::BackgroundWorkerTerminated))?
657 }
658}
659
660#[must_use]
662pub struct BabeWorker<B: BlockT> {
663 inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
664 slot_notification_sinks: SlotNotificationSinks<B>,
665}
666
667impl<B: BlockT> BabeWorker<B> {
668 pub fn slot_notification_stream(
671 &self,
672 ) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
673 const CHANNEL_BUFFER_SIZE: usize = 1024;
674
675 let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
676 self.slot_notification_sinks.lock().push(sink);
677 stream
678 }
679}
680
681impl<B: BlockT> Future for BabeWorker<B> {
682 type Output = ();
683
684 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
685 self.inner.as_mut().poll(cx)
686 }
687}
688
689type SlotNotificationSinks<B> = Arc<
691 Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
692>;
693
694struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
695 client: Arc<C>,
696 block_import: I,
697 env: E,
698 sync_oracle: SO,
699 justification_sync_link: L,
700 force_authoring: bool,
701 backoff_authoring_blocks: Option<BS>,
702 keystore: KeystorePtr,
703 epoch_changes: SharedEpochChanges<B, Epoch>,
704 slot_notification_sinks: SlotNotificationSinks<B>,
705 config: BabeConfiguration,
706 block_proposal_slot_portion: SlotProportion,
707 max_block_proposal_slot_portion: Option<SlotProportion>,
708 telemetry: Option<TelemetryHandle>,
709}
710
711#[async_trait::async_trait]
712impl<B, C, E, I, Error, SO, L, BS> pezsc_consensus_slots::SimpleSlotWorker<B>
713 for BabeSlotWorker<B, C, E, I, SO, L, BS>
714where
715 B: BlockT,
716 C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
717 C::Api: BabeApi<B>,
718 E: Environment<B, Error = Error> + Send + Sync,
719 E::Proposer: Proposer<B, Error = Error>,
720 I: BlockImport<B> + Send + Sync + 'static,
721 SO: SyncOracle + Send + Clone + Sync,
722 L: pezsc_consensus::JustificationSyncLink<B>,
723 BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
724 Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
725{
726 type Claim = (PreDigest, AuthorityId);
727 type SyncOracle = SO;
728 type JustificationSyncLink = L;
729 type CreateProposer =
730 Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
731 type Proposer = E::Proposer;
732 type BlockImport = I;
733 type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
734
735 fn logging_target(&self) -> &'static str {
736 LOG_TARGET
737 }
738
739 fn block_import(&mut self) -> &mut Self::BlockImport {
740 &mut self.block_import
741 }
742
743 fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
744 self.epoch_changes
745 .shared_data()
746 .epoch_descriptor_for_child_of(
747 descendent_query(&*self.client),
748 &parent.hash(),
749 *parent.number(),
750 slot,
751 )
752 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
753 .ok_or(ConsensusError::InvalidAuthoritiesSet)
754 }
755
756 fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
757 self.epoch_changes
758 .shared_data()
759 .viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
760 .map(|epoch| epoch.as_ref().authorities.len())
761 }
762
763 async fn claim_slot(
764 &mut self,
765 _parent_header: &B::Header,
766 slot: Slot,
767 epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
768 ) -> Option<Self::Claim> {
769 debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
770 let s = authorship::claim_slot(
771 slot,
772 self.epoch_changes
773 .shared_data()
774 .viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
775 .as_ref(),
776 &self.keystore,
777 );
778
779 if s.is_some() {
780 debug!(target: LOG_TARGET, "Claimed slot {}", slot);
781 }
782
783 s
784 }
785
786 fn notify_slot(
787 &self,
788 _parent_header: &B::Header,
789 slot: Slot,
790 epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
791 ) {
792 let sinks = &mut self.slot_notification_sinks.lock();
793 sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
794 Ok(()) => true,
795 Err(e) => {
796 if e.is_full() {
797 warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
798 true
799 } else {
800 false
801 }
802 },
803 });
804 }
805
806 fn pre_digest_data(&self, _slot: Slot, claim: &Self::Claim) -> Vec<pezsp_runtime::DigestItem> {
807 vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
808 }
809
810 async fn block_import_params(
811 &self,
812 header: B::Header,
813 header_hash: &B::Hash,
814 body: Vec<B::Extrinsic>,
815 storage_changes: StorageChanges<B>,
816 (_, public): Self::Claim,
817 epoch_descriptor: Self::AuxData,
818 ) -> Result<BlockImportParams<B>, ConsensusError> {
819 let signature = self
820 .keystore
821 .sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
822 .map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
823 .ok_or_else(|| {
824 ConsensusError::CannotSign(format!(
825 "Could not find key in keystore. Key: {:?}",
826 public
827 ))
828 })?;
829
830 let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
831
832 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
833 import_block.post_digests.push(digest_item);
834 import_block.body = Some(body);
835 import_block.state_action =
836 StateAction::ApplyChanges(pezsc_consensus::StorageChanges::Changes(storage_changes));
837 import_block
838 .insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
839
840 Ok(import_block)
841 }
842
843 fn force_authoring(&self) -> bool {
844 self.force_authoring
845 }
846
847 fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
848 if let Some(ref strategy) = self.backoff_authoring_blocks {
849 if let Ok(chain_head_slot) =
850 find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
851 {
852 return strategy.should_backoff(
853 *chain_head.number(),
854 chain_head_slot,
855 self.client.info().finalized_number,
856 slot,
857 self.logging_target(),
858 );
859 }
860 }
861 false
862 }
863
864 fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
865 &mut self.sync_oracle
866 }
867
868 fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
869 &mut self.justification_sync_link
870 }
871
872 fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
873 Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
874 }
875
876 fn telemetry(&self) -> Option<TelemetryHandle> {
877 self.telemetry.clone()
878 }
879
880 fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
881 let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
882
883 pezsc_consensus_slots::proposing_remaining_duration(
884 parent_slot,
885 slot_info,
886 &self.block_proposal_slot_portion,
887 self.max_block_proposal_slot_portion.as_ref(),
888 pezsc_consensus_slots::SlotLenienceType::Exponential,
889 self.logging_target(),
890 )
891 }
892}
893
894pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
897 if header.number().is_zero() {
900 return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
901 slot: 0.into(),
902 authority_index: 0,
903 }));
904 }
905
906 let mut pre_digest: Option<_> = None;
907 for log in header.digest().logs() {
908 trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
909 match (log.as_babe_pre_digest(), pre_digest.is_some()) {
910 (Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
911 (None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
912 (s, false) => pre_digest = s,
913 }
914 }
915 pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
916}
917
918pub fn contains_epoch_change<B: BlockT>(header: &B::Header) -> bool {
920 find_next_epoch_digest::<B>(header).ok().flatten().is_some()
921}
922
923pub fn find_next_epoch_digest<B: BlockT>(
925 header: &B::Header,
926) -> Result<Option<NextEpochDescriptor>, Error<B>> {
927 let mut epoch_digest: Option<_> = None;
928 for log in header.digest().logs() {
929 trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
930 let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
931 match (log, epoch_digest.is_some()) {
932 (Some(ConsensusLog::NextEpochData(_)), true) => {
933 return Err(babe_err(Error::MultipleEpochChangeDigests))
934 },
935 (Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
936 _ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
937 }
938 }
939
940 Ok(epoch_digest)
941}
942
943fn find_next_config_digest<B: BlockT>(
945 header: &B::Header,
946) -> Result<Option<NextConfigDescriptor>, Error<B>> {
947 let mut config_digest: Option<_> = None;
948 for log in header.digest().logs() {
949 trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
950 let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
951 match (log, config_digest.is_some()) {
952 (Some(ConsensusLog::NextConfigData(_)), true) => {
953 return Err(babe_err(Error::MultipleConfigChangeDigests))
954 },
955 (Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
956 _ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
957 }
958 }
959
960 Ok(config_digest)
961}
962
963#[derive(Clone)]
965pub struct BabeLink<Block: BlockT> {
966 epoch_changes: SharedEpochChanges<Block, Epoch>,
967 config: BabeConfiguration,
968}
969
970impl<Block: BlockT> BabeLink<Block> {
971 pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
973 &self.epoch_changes
974 }
975
976 pub fn config(&self) -> &BabeConfiguration {
978 &self.config
979 }
980}
981
982pub struct BabeVerifier<Block: BlockT, Client> {
984 client: Arc<Client>,
985 slot_duration: SlotDuration,
986 config: BabeConfiguration,
987 epoch_changes: SharedEpochChanges<Block, Epoch>,
988 telemetry: Option<TelemetryHandle>,
989}
990
991#[async_trait::async_trait]
992impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client>
993where
994 Block: BlockT,
995 Client: HeaderMetadata<Block, Error = pezsp_blockchain::Error>
996 + HeaderBackend<Block>
997 + ProvideRuntimeApi<Block>
998 + Send
999 + Sync
1000 + AuxStore,
1001 Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
1002{
1003 async fn verify(
1004 &self,
1005 mut block: BlockImportParams<Block>,
1006 ) -> Result<BlockImportParams<Block>, String> {
1007 trace!(
1008 target: LOG_TARGET,
1009 "Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
1010 block.origin,
1011 block.header,
1012 block.justifications,
1013 block.body,
1014 );
1015
1016 let hash = block.header.hash();
1017 let parent_hash = *block.header.parent_hash();
1018
1019 let number = block.header.number();
1020
1021 if is_state_sync_or_gap_sync_import(&*self.client, &block) {
1022 return Ok(block);
1023 }
1024
1025 debug!(
1026 target: LOG_TARGET,
1027 "We have {:?} logs in this header",
1028 block.header.digest().logs().len()
1029 );
1030
1031 let slot_now = Slot::from_timestamp(Timestamp::current(), self.slot_duration);
1032
1033 let pre_digest = find_pre_digest::<Block>(&block.header)?;
1034 let (check_header, epoch_descriptor) = {
1035 let (epoch_descriptor, viable_epoch) = query_epoch_changes(
1036 &self.epoch_changes,
1037 self.client.as_ref(),
1038 &self.config,
1039 *number,
1040 pre_digest.slot(),
1041 parent_hash,
1042 )?;
1043
1044 let v_params = verification::VerificationParams {
1047 header: block.header.clone(),
1048 pre_digest: Some(pre_digest),
1049 slot_now: slot_now + 1,
1050 epoch: viable_epoch.as_ref(),
1051 };
1052
1053 (verification::check_header::<Block>(v_params)?, epoch_descriptor)
1054 };
1055
1056 match check_header {
1057 CheckedHeader::Checked(pre_header, verified_info) => {
1058 trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
1059 telemetry!(
1060 self.telemetry;
1061 CONSENSUS_TRACE;
1062 "babe.checked_and_importing";
1063 "pre_header" => ?pre_header,
1064 );
1065
1066 block.header = pre_header;
1067 block.post_digests.push(verified_info.seal);
1068 block.insert_intermediate(
1069 INTERMEDIATE_KEY,
1070 BabeIntermediate::<Block> { epoch_descriptor },
1071 );
1072 block.post_hash = Some(hash);
1073
1074 Ok(block)
1075 },
1076 CheckedHeader::Deferred(a, b) => {
1077 debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
1078 telemetry!(
1079 self.telemetry;
1080 CONSENSUS_DEBUG;
1081 "babe.header_too_far_in_future";
1082 "hash" => ?hash, "a" => ?a, "b" => ?b
1083 );
1084 Err(Error::<Block>::TooFarInFuture(hash).into())
1085 },
1086 }
1087 }
1088}
1089
1090fn is_state_sync_or_gap_sync_import<B: BlockT>(
1096 client: &impl HeaderBackend<B>,
1097 block: &BlockImportParams<B>,
1098) -> bool {
1099 let number = *block.header.number();
1100 let info = client.info();
1101 info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end)
1102 || block.with_state()
1103}
1104
1105pub struct BabeBlockImport<Block: BlockT, Client, I, CIDP, SC> {
1114 inner: I,
1115 client: Arc<Client>,
1116 epoch_changes: SharedEpochChanges<Block, Epoch>,
1117 create_inherent_data_providers: CIDP,
1118 config: BabeConfiguration,
1119 select_chain: SC,
1124 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1128}
1129
1130impl<Block: BlockT, I: Clone, Client, CIDP: Clone, SC: Clone> Clone
1131 for BabeBlockImport<Block, Client, I, CIDP, SC>
1132{
1133 fn clone(&self) -> Self {
1134 BabeBlockImport {
1135 inner: self.inner.clone(),
1136 client: self.client.clone(),
1137 epoch_changes: self.epoch_changes.clone(),
1138 config: self.config.clone(),
1139 create_inherent_data_providers: self.create_inherent_data_providers.clone(),
1140 select_chain: self.select_chain.clone(),
1141 offchain_tx_pool_factory: self.offchain_tx_pool_factory.clone(),
1142 }
1143 }
1144}
1145
1146impl<Block: BlockT, Client, I, CIDP, SC> BabeBlockImport<Block, Client, I, CIDP, SC> {
1147 fn new(
1148 client: Arc<Client>,
1149 epoch_changes: SharedEpochChanges<Block, Epoch>,
1150 block_import: I,
1151 config: BabeConfiguration,
1152 create_inherent_data_providers: CIDP,
1153 select_chain: SC,
1154 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1155 ) -> Self {
1156 BabeBlockImport {
1157 client,
1158 inner: block_import,
1159 epoch_changes,
1160 config,
1161 create_inherent_data_providers,
1162 select_chain,
1163 offchain_tx_pool_factory,
1164 }
1165 }
1166}
1167
1168impl<Block, Client, Inner, CIDP, SC> BabeBlockImport<Block, Client, Inner, CIDP, SC>
1169where
1170 Block: BlockT,
1171 Inner: BlockImport<Block> + Send + Sync,
1172 Inner::Error: Into<ConsensusError>,
1173 Client: HeaderBackend<Block>
1174 + HeaderMetadata<Block, Error = pezsp_blockchain::Error>
1175 + AuxStore
1176 + ProvideRuntimeApi<Block>
1177 + Send
1178 + Sync,
1179 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1180 CIDP: CreateInherentDataProviders<Block, ()>,
1181 CIDP::InherentDataProviders: InherentDataProviderExt + Send,
1182 SC: pezsp_consensus::SelectChain<Block> + 'static,
1183{
1184 async fn import_state(
1188 &self,
1189 mut block: BlockImportParams<Block>,
1190 ) -> Result<ImportResult, ConsensusError> {
1191 let hash = block.post_hash();
1192 let parent_hash = *block.header.parent_hash();
1193 let number = *block.header.number();
1194
1195 block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
1196 aux_schema::write_block_weight(hash, 0, |values| {
1198 block
1199 .auxiliary
1200 .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1201 });
1202
1203 let import_result = self.inner.import_block(block).await;
1205 let aux = match import_result {
1206 Ok(ImportResult::Imported(aux)) => aux,
1207 Ok(r) => {
1208 return Err(ConsensusError::ClientImport(format!(
1209 "Unexpected import result: {:?}",
1210 r
1211 )))
1212 },
1213 Err(r) => return Err(r.into()),
1214 };
1215
1216 let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
1218 ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1219 })?;
1220 let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
1221 ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1222 })?;
1223
1224 let mut epoch_changes = self.epoch_changes.shared_data_locked();
1225 epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
1226 aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1227 self.client.insert_aux(insert, [])
1228 })
1229 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1230
1231 Ok(ImportResult::Imported(aux))
1232 }
1233
1234 async fn check_inherents_and_equivocations(
1236 &self,
1237 block: &mut BlockImportParams<Block>,
1238 ) -> Result<(), ConsensusError> {
1239 if is_state_sync_or_gap_sync_import(&*self.client, block) {
1240 return Ok(());
1241 }
1242
1243 let parent_hash = *block.header.parent_hash();
1244 let number = *block.header.number();
1245
1246 let create_inherent_data_providers = self
1247 .create_inherent_data_providers
1248 .create_inherent_data_providers(parent_hash, ())
1249 .await?;
1250
1251 let slot_now = create_inherent_data_providers.slot();
1252
1253 let babe_pre_digest = find_pre_digest::<Block>(&block.header)
1254 .map_err(|e| ConsensusError::Other(Box::new(e)))?;
1255 let slot = babe_pre_digest.slot();
1256
1257 self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
1259 .await?;
1260
1261 let author = {
1263 let viable_epoch = query_epoch_changes(
1264 &self.epoch_changes,
1265 self.client.as_ref(),
1266 &self.config,
1267 number,
1268 slot,
1269 parent_hash,
1270 )
1271 .map_err(|e| ConsensusError::Other(babe_err(e).into()))?
1272 .1;
1273 match viable_epoch
1274 .as_ref()
1275 .authorities
1276 .get(babe_pre_digest.authority_index() as usize)
1277 {
1278 Some(author) => author.0.clone(),
1279 None => {
1280 return Err(ConsensusError::Other(Error::<Block>::SlotAuthorNotFound.into()))
1281 },
1282 }
1283 };
1284 if let Err(err) = self
1285 .check_and_report_equivocation(slot_now, slot, &block.header, &author, &block.origin)
1286 .await
1287 {
1288 warn!(
1289 target: LOG_TARGET,
1290 "Error checking/reporting BABE equivocation: {}", err
1291 );
1292 }
1293 Ok(())
1294 }
1295
1296 async fn check_inherents(
1297 &self,
1298 block: &mut BlockImportParams<Block>,
1299 at_hash: Block::Hash,
1300 slot: Slot,
1301 create_inherent_data_providers: CIDP::InherentDataProviders,
1302 ) -> Result<(), ConsensusError> {
1303 if block.state_action.skip_execution_checks() {
1304 return Ok(());
1305 }
1306
1307 if let Some(inner_body) = block.body.take() {
1308 let new_block = Block::new(block.header.clone(), inner_body);
1309 let mut inherent_data = create_inherent_data_providers
1313 .create_inherent_data()
1314 .await
1315 .map_err(|e| ConsensusError::Other(Box::new(e)))?;
1316 inherent_data.babe_replace_inherent_data(slot);
1317
1318 use pezsp_block_builder::CheckInherentsError;
1319
1320 pezsp_block_builder::check_inherents_with_data(
1321 self.client.clone(),
1322 at_hash,
1323 new_block.clone(),
1324 &create_inherent_data_providers,
1325 inherent_data,
1326 )
1327 .await
1328 .map_err(|e| {
1329 ConsensusError::Other(Box::new(match e {
1330 CheckInherentsError::CreateInherentData(e) => {
1331 Error::<Block>::CreateInherents(e)
1332 },
1333 CheckInherentsError::Client(e) => Error::RuntimeApi(e),
1334 CheckInherentsError::CheckInherents(e) => Error::CheckInherents(e),
1335 CheckInherentsError::CheckInherentsUnknownError(id) => {
1336 Error::CheckInherentsUnhandled(id)
1337 },
1338 }))
1339 })?;
1340 let (_, inner_body) = new_block.deconstruct();
1341 block.body = Some(inner_body);
1342 }
1343
1344 Ok(())
1345 }
1346
1347 async fn check_and_report_equivocation(
1348 &self,
1349 slot_now: Slot,
1350 slot: Slot,
1351 header: &Block::Header,
1352 author: &AuthorityId,
1353 origin: &BlockOrigin,
1354 ) -> Result<(), Error<Block>> {
1355 if *origin == BlockOrigin::NetworkInitialSync {
1358 return Ok(());
1359 }
1360
1361 let Some(equivocation_proof) =
1363 check_equivocation(&*self.client, slot_now, slot, header, author)
1364 .map_err(Error::Client)?
1365 else {
1366 return Ok(());
1367 };
1368
1369 info!(
1370 "Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
1371 author,
1372 slot,
1373 equivocation_proof.first_header.hash(),
1374 equivocation_proof.second_header.hash(),
1375 );
1376
1377 let best_hash = self
1379 .select_chain
1380 .best_chain()
1381 .await
1382 .map(|h| h.hash())
1383 .map_err(|e| Error::Client(e.into()))?;
1384
1385 let generate_key_owner_proof = |at_hash: Block::Hash| {
1394 self.client
1395 .runtime_api()
1396 .generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
1397 .map_err(Error::RuntimeApi)
1398 };
1399
1400 let parent_hash = *header.parent_hash();
1401 let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
1402 Some(proof) => proof,
1403 None => match generate_key_owner_proof(best_hash)? {
1404 Some(proof) => proof,
1405 None => {
1406 debug!(
1407 target: LOG_TARGET,
1408 "Equivocation offender is not part of the authority set."
1409 );
1410 return Ok(());
1411 },
1412 },
1413 };
1414
1415 let mut runtime_api = self.client.runtime_api();
1417
1418 runtime_api
1420 .register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
1421
1422 runtime_api
1423 .submit_report_equivocation_unsigned_extrinsic(
1424 best_hash,
1425 equivocation_proof,
1426 key_owner_proof,
1427 )
1428 .map_err(Error::RuntimeApi)?;
1429
1430 info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
1431
1432 Ok(())
1433 }
1434}
1435
1436#[async_trait::async_trait]
1437impl<Block, Client, Inner, CIDP, SC> BlockImport<Block>
1438 for BabeBlockImport<Block, Client, Inner, CIDP, SC>
1439where
1440 Block: BlockT,
1441 Inner: BlockImport<Block> + Send + Sync,
1442 Inner::Error: Into<ConsensusError>,
1443 Client: HeaderBackend<Block>
1444 + HeaderMetadata<Block, Error = pezsp_blockchain::Error>
1445 + AuxStore
1446 + ProvideRuntimeApi<Block>
1447 + Send
1448 + Sync,
1449 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1450 CIDP: CreateInherentDataProviders<Block, ()>,
1451 CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
1452 SC: SelectChain<Block> + 'static,
1453{
1454 type Error = ConsensusError;
1455
1456 async fn import_block(
1457 &self,
1458 mut block: BlockImportParams<Block>,
1459 ) -> Result<ImportResult, Self::Error> {
1460 let hash = block.post_hash();
1461 let number = *block.header.number();
1462 let info = self.client.info();
1463
1464 self.check_inherents_and_equivocations(&mut block).await?;
1465
1466 let block_status = self
1467 .client
1468 .status(hash)
1469 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1470
1471 if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end)
1475 || block_status == BlockStatus::InChain
1476 {
1477 let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
1480 block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1481 return self.inner.import_block(block).await.map_err(Into::into);
1482 }
1483
1484 if block.with_state() {
1485 return self.import_state(block).await;
1486 }
1487
1488 let pre_digest = find_pre_digest::<Block>(&block.header).expect(
1489 "valid babe headers must contain a predigest; header has been already verified; qed",
1490 );
1491 let slot = pre_digest.slot();
1492
1493 let parent_hash = *block.header.parent_hash();
1494 let parent_header = self
1495 .client
1496 .header(parent_hash)
1497 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1498 .ok_or_else(|| {
1499 ConsensusError::ChainLookup(
1500 babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
1501 )
1502 })?;
1503
1504 let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
1505 "parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
1506 been verified; qed",
1507 );
1508
1509 if slot <= parent_slot {
1511 return Err(ConsensusError::ClientImport(
1512 babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
1513 ));
1514 }
1515
1516 let mut old_epoch_changes = None;
1519
1520 let mut epoch_changes = {
1523 let mut epoch_changes = self.epoch_changes.shared_data_locked();
1524
1525 let (epoch_descriptor, first_in_epoch, parent_weight) = {
1531 let parent_weight = if *parent_header.number() == Zero::zero() {
1532 0
1533 } else {
1534 aux_schema::load_block_weight(&*self.client, parent_hash)
1535 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1536 .ok_or_else(|| {
1537 ConsensusError::ClientImport(
1538 babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
1539 .into(),
1540 )
1541 })?
1542 };
1543
1544 let intermediate =
1545 block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
1546
1547 let epoch_descriptor = intermediate.epoch_descriptor;
1548 let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
1549 (epoch_descriptor, first_in_epoch, parent_weight)
1550 };
1551
1552 let total_weight = parent_weight + pre_digest.added_weight();
1553
1554 let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
1556 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1557 let next_config_digest = find_next_config_digest::<Block>(&block.header)
1558 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1559
1560 match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
1561 (true, true, _) => {},
1562 (false, false, false) => {},
1563 (false, false, true) => {
1564 return Err(ConsensusError::ClientImport(
1565 babe_err(Error::<Block>::UnexpectedConfigChange).into(),
1566 ))
1567 },
1568 (true, false, _) => {
1569 return Err(ConsensusError::ClientImport(
1570 babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
1571 ))
1572 },
1573 (false, true, _) => {
1574 return Err(ConsensusError::ClientImport(
1575 babe_err(Error::<Block>::UnexpectedEpochChange).into(),
1576 ))
1577 },
1578 }
1579
1580 if let Some(next_epoch_descriptor) = next_epoch_digest {
1581 old_epoch_changes = Some((*epoch_changes).clone());
1582
1583 let mut viable_epoch = epoch_changes
1584 .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
1585 .ok_or_else(|| {
1586 ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
1587 })?
1588 .into_cloned();
1589
1590 let epoch_config = next_config_digest
1591 .map(Into::into)
1592 .unwrap_or_else(|| viable_epoch.as_ref().config.clone());
1593
1594 let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
1596 log::Level::Debug
1597 } else {
1598 log::Level::Info
1599 };
1600
1601 if viable_epoch.as_ref().end_slot() <= slot {
1602 let epoch = viable_epoch.as_mut();
1616 let prev_index = epoch.epoch_index;
1617 *epoch = epoch.clone_for_slot(slot);
1618
1619 warn!(
1620 target: LOG_TARGET,
1621 "👶 Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
1622 );
1623 }
1624
1625 log!(
1626 target: LOG_TARGET,
1627 log_level,
1628 "👶 New epoch {} launching at block {} (block slot {} >= start slot {}).",
1629 viable_epoch.as_ref().epoch_index,
1630 hash,
1631 slot,
1632 viable_epoch.as_ref().start_slot,
1633 );
1634
1635 let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
1636
1637 log!(
1638 target: LOG_TARGET,
1639 log_level,
1640 "👶 Next epoch starts at slot {}",
1641 next_epoch.as_ref().start_slot,
1642 );
1643
1644 let prune_and_import = || {
1652 prune_finalized(self.client.clone(), &mut epoch_changes)?;
1653
1654 epoch_changes
1655 .import(
1656 descendent_query(&*self.client),
1657 hash,
1658 number,
1659 *block.header.parent_hash(),
1660 next_epoch,
1661 )
1662 .map_err(|e| {
1663 ConsensusError::ClientImport(format!(
1664 "Error importing epoch changes: {}",
1665 e
1666 ))
1667 })?;
1668 Ok(())
1669 };
1670
1671 if let Err(e) = prune_and_import() {
1672 debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
1673 *epoch_changes =
1674 old_epoch_changes.expect("set `Some` above and not taken; qed");
1675 return Err(e);
1676 }
1677
1678 crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1679 block
1680 .auxiliary
1681 .extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1682 });
1683 }
1684
1685 aux_schema::write_block_weight(hash, total_weight, |values| {
1686 block
1687 .auxiliary
1688 .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1689 });
1690
1691 block.fork_choice = {
1695 let (last_best, last_best_number) = (info.best_hash, info.best_number);
1696
1697 let last_best_weight = if &last_best == block.header.parent_hash() {
1698 parent_weight
1701 } else {
1702 aux_schema::load_block_weight(&*self.client, last_best)
1703 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1704 .ok_or_else(|| {
1705 ConsensusError::ChainLookup(
1706 "No block weight for parent header.".to_string(),
1707 )
1708 })?
1709 };
1710
1711 Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
1712 true
1713 } else if total_weight == last_best_weight {
1714 number > last_best_number
1715 } else {
1716 false
1717 }))
1718 };
1719
1720 epoch_changes.release_mutex()
1722 };
1723
1724 let import_result = self.inner.import_block(block).await;
1725
1726 if import_result.is_err() {
1729 if let Some(old_epoch_changes) = old_epoch_changes {
1730 *epoch_changes.upgrade() = old_epoch_changes;
1731 }
1732 }
1733
1734 import_result.map_err(Into::into)
1735 }
1736
1737 async fn check_block(
1738 &self,
1739 block: BlockCheckParams<Block>,
1740 ) -> Result<ImportResult, Self::Error> {
1741 self.inner.check_block(block).await.map_err(Into::into)
1742 }
1743}
1744
1745fn prune_finalized<Block, Client>(
1747 client: Arc<Client>,
1748 epoch_changes: &mut EpochChangesFor<Block, Epoch>,
1749) -> Result<(), ConsensusError>
1750where
1751 Block: BlockT,
1752 Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = pezsp_blockchain::Error>,
1753{
1754 let info = client.info();
1755
1756 let finalized_slot = {
1757 let finalized_header = client
1758 .header(info.finalized_hash)
1759 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1760 .expect(
1761 "best finalized hash was given by client; finalized headers must exist in db; qed",
1762 );
1763
1764 find_pre_digest::<Block>(&finalized_header)
1765 .expect("finalized header must be valid; valid blocks have a pre-digest; qed")
1766 .slot()
1767 };
1768
1769 epoch_changes
1770 .prune_finalized(
1771 descendent_query(&*client),
1772 &info.finalized_hash,
1773 info.finalized_number,
1774 finalized_slot,
1775 )
1776 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1777
1778 Ok(())
1779}
1780
1781pub fn block_import<Client, Block: BlockT, I, CIDP, SC>(
1787 config: BabeConfiguration,
1788 wrapped_block_import: I,
1789 client: Arc<Client>,
1790 create_inherent_data_providers: CIDP,
1791 select_chain: SC,
1792 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1793) -> ClientResult<(BabeBlockImport<Block, Client, I, CIDP, SC>, BabeLink<Block>)>
1794where
1795 Client: AuxStore
1796 + HeaderBackend<Block>
1797 + HeaderMetadata<Block, Error = pezsp_blockchain::Error>
1798 + PreCommitActions<Block>
1799 + 'static,
1800{
1801 let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
1802 let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
1803
1804 prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
1808
1809 let client_weak = Arc::downgrade(&client);
1810 let on_finality = move |summary: &FinalityNotification<Block>| {
1811 if let Some(client) = client_weak.upgrade() {
1812 aux_storage_cleanup(client.as_ref(), summary)
1813 } else {
1814 Default::default()
1815 }
1816 };
1817 client.register_finality_action(Box::new(on_finality));
1818
1819 let import = BabeBlockImport::new(
1820 client,
1821 epoch_changes,
1822 wrapped_block_import,
1823 config,
1824 create_inherent_data_providers,
1825 select_chain,
1826 offchain_tx_pool_factory,
1827 );
1828
1829 Ok((import, link))
1830}
1831
1832pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, Spawn> {
1834 pub link: BabeLink<Block>,
1836 pub block_import: BI,
1838 pub justification_import: Option<BoxJustificationImport<Block>>,
1840 pub client: Arc<Client>,
1842 pub slot_duration: SlotDuration,
1844 pub spawner: &'a Spawn,
1846 pub registry: Option<&'a Registry>,
1848 pub telemetry: Option<TelemetryHandle>,
1850}
1851
1852pub fn import_queue<Block: BlockT, Client, BI, Spawn>(
1862 ImportQueueParams {
1863 link: babe_link,
1864 block_import,
1865 justification_import,
1866 client,
1867 slot_duration,
1868 spawner,
1869 registry,
1870 telemetry,
1871 }: ImportQueueParams<'_, Block, BI, Client, Spawn>,
1872) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
1873where
1874 BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1875 Client: ProvideRuntimeApi<Block>
1876 + HeaderBackend<Block>
1877 + HeaderMetadata<Block, Error = pezsp_blockchain::Error>
1878 + AuxStore
1879 + Send
1880 + Sync
1881 + 'static,
1882 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1883 Spawn: SpawnEssentialNamed,
1884{
1885 const HANDLE_BUFFER_SIZE: usize = 1024;
1886
1887 let verifier = BabeVerifier {
1888 slot_duration,
1889 config: babe_link.config.clone(),
1890 epoch_changes: babe_link.epoch_changes.clone(),
1891 telemetry,
1892 client: client.clone(),
1893 };
1894
1895 let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
1896
1897 let answer_requests =
1898 answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
1899
1900 spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
1901
1902 Ok((
1903 BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
1904 BabeWorkerHandle(worker_tx),
1905 ))
1906}
1907
1908pub fn revert<Block, Client, Backend>(
1912 client: Arc<Client>,
1913 backend: Arc<Backend>,
1914 blocks: NumberFor<Block>,
1915) -> ClientResult<()>
1916where
1917 Block: BlockT,
1918 Client: AuxStore
1919 + HeaderMetadata<Block, Error = pezsp_blockchain::Error>
1920 + HeaderBackend<Block>
1921 + ProvideRuntimeApi<Block>
1922 + UsageProvider<Block>,
1923 Client::Api: BabeApi<Block>,
1924 Backend: BackendT<Block>,
1925{
1926 let best_number = client.info().best_number;
1927 let finalized = client.info().finalized_number;
1928
1929 let revertible = blocks.min(best_number - finalized);
1930 if revertible == Zero::zero() {
1931 return Ok(());
1932 }
1933
1934 let revert_up_to_number = best_number - revertible;
1935 let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
1936 format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
1937 ))?;
1938
1939 let config = configuration(&*client)?;
1943 let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
1944 let mut epoch_changes = epoch_changes.shared_data();
1945
1946 if revert_up_to_number == Zero::zero() {
1947 *epoch_changes = EpochChangesFor::<Block, Epoch>::default();
1949 } else {
1950 epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
1951 }
1952
1953 let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
1956
1957 let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
1958 pezsp_blockchain::tree_route(&*client, revert_up_to_hash, leaf)
1959 .map(|route| route.retracted().is_empty())
1960 .unwrap_or_default()
1961 });
1962
1963 for leaf in leaves {
1964 let mut hash = leaf;
1965 loop {
1966 let meta = client.header_metadata(hash)?;
1967 if meta.number <= revert_up_to_number
1968 || !weight_keys.insert(aux_schema::block_weight_key(hash))
1969 {
1970 break;
1972 }
1973 hash = meta.parent;
1974 }
1975 }
1976
1977 let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
1978
1979 aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
1981 client.insert_aux(values, weight_keys.iter())
1982 })
1983}
1984
1985fn query_epoch_changes<Block, Client>(
1986 epoch_changes: &SharedEpochChanges<Block, Epoch>,
1987 client: &Client,
1988 config: &BabeConfiguration,
1989 block_number: NumberFor<Block>,
1990 slot: Slot,
1991 parent_hash: Block::Hash,
1992) -> Result<
1993 (ViableEpochDescriptor<Block::Hash, NumberFor<Block>, Epoch>, ViableEpoch<Epoch>),
1994 Error<Block>,
1995>
1996where
1997 Block: BlockT,
1998 Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = pezsp_blockchain::Error>,
1999{
2000 let epoch_changes = epoch_changes.shared_data();
2001 let epoch_descriptor = epoch_changes
2002 .epoch_descriptor_for_child_of(
2003 descendent_query(client),
2004 &parent_hash,
2005 block_number - 1u32.into(),
2006 slot,
2007 )
2008 .map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
2009 .ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2010 let viable_epoch = epoch_changes
2011 .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
2012 .ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2013 Ok((epoch_descriptor, viable_epoch.into_cloned()))
2014}