1#![forbid(unsafe_code)]
55#![warn(missing_docs)]
56
57pub mod rpc;
58
59use std::{
60 collections::HashSet,
61 future::Future,
62 ops::{Deref, DerefMut},
63 pin::Pin,
64 sync::Arc,
65 task::{Context, Poll},
66 time::Duration,
67};
68
69use codec::{Decode, Encode};
70use futures::{
71 channel::{
72 mpsc::{channel, Receiver, Sender},
73 oneshot,
74 },
75 prelude::*,
76};
77use log::{debug, info, log, trace, warn};
78use parking_lot::Mutex;
79use soil_prometheus::Registry;
80
81use soil_client::blockchain::{
82 Backend as _, BlockStatus, Error as ClientError, HeaderBackend, HeaderMetadata,
83 Result as ClientResult,
84};
85use soil_client::client_api::{
86 backend::AuxStore, AuxDataOperations, Backend as BackendT, FinalityNotification,
87 PreCommitActions, UsageProvider,
88};
89use soil_client::consensus::{
90 BlockOrigin, Environment, Error as ConsensusError, Proposer, SelectChain,
91};
92use soil_client::import::{
93 BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxJustificationImport,
94 DefaultImportQueue, ForkChoiceStrategy, ImportResult, StateAction, Verifier,
95};
96use soil_client::transaction_pool::OffchainTransactionPoolFactory;
97use soil_consensus::epochs::{
98 descendent_query, Epoch as EpochT, EpochChangesFor, SharedEpochChanges, ViableEpoch,
99 ViableEpochDescriptor,
100};
101use soil_consensus::slots::{
102 check_equivocation, BackoffAuthoringBlocksStrategy, CheckedHeader, InherentDataProviderExt,
103 SlotInfo, StorageChanges,
104};
105use soil_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE};
106use subsoil::api::{ApiExt, ProvideRuntimeApi};
107use subsoil::application_crypto::AppCrypto;
108use subsoil::block_builder::BlockBuilder as BlockBuilderApi;
109use subsoil::consensus::babe::{inherents::BabeInherentData, SlotDuration};
110use subsoil::consensus::slots::Slot;
111use subsoil::core::traits::SpawnEssentialNamed;
112use subsoil::inherents::{CreateInherentDataProviders, InherentDataProvider};
113use subsoil::keystore::KeystorePtr;
114use subsoil::runtime::{
115 generic::OpaqueDigestItemId,
116 traits::{Block as BlockT, Header, NumberFor, SaturatedConversion, Zero},
117 DigestItem,
118};
119
120pub use soil_client::consensus::SyncOracle;
121pub use soil_consensus::slots::SlotProportion;
122pub use subsoil::consensus::babe::{
123 digests::{
124 CompatibleDigestItem, NextConfigDescriptor, NextEpochDescriptor, PreDigest,
125 PrimaryPreDigest, SecondaryPlainPreDigest,
126 },
127 AuthorityId, AuthorityPair, AuthoritySignature, BabeApi, BabeAuthorityWeight, BabeBlockWeight,
128 BabeConfiguration, BabeEpochConfiguration, ConsensusLog, Randomness, BABE_ENGINE_ID,
129};
130
131pub use aux_schema::load_block_weight as block_weight;
132use subsoil::timestamp::Timestamp;
133
134mod migration;
135mod verification;
136
137pub mod authorship;
138pub mod aux_schema;
139#[cfg(test)]
140mod tests;
141
142const LOG_TARGET: &str = "babe";
143
144const AUTHORING_SCORE_VRF_CONTEXT: &[u8] = b"substrate-babe-vrf";
146
147const AUTHORING_SCORE_LENGTH: usize = 16;
149
150#[derive(Clone, Debug, PartialEq, Eq, Encode, Decode)]
152pub struct Epoch(subsoil::consensus::babe::Epoch);
153
154impl Deref for Epoch {
155 type Target = subsoil::consensus::babe::Epoch;
156
157 fn deref(&self) -> &Self::Target {
158 &self.0
159 }
160}
161
162impl DerefMut for Epoch {
163 fn deref_mut(&mut self) -> &mut Self::Target {
164 &mut self.0
165 }
166}
167
168impl From<subsoil::consensus::babe::Epoch> for Epoch {
169 fn from(epoch: subsoil::consensus::babe::Epoch) -> Self {
170 Epoch(epoch)
171 }
172}
173
174impl EpochT for Epoch {
175 type NextEpochDescriptor = (NextEpochDescriptor, BabeEpochConfiguration);
176 type Slot = Slot;
177
178 fn increment(
179 &self,
180 (descriptor, config): (NextEpochDescriptor, BabeEpochConfiguration),
181 ) -> Epoch {
182 subsoil::consensus::babe::Epoch {
183 epoch_index: self.epoch_index + 1,
184 start_slot: self.start_slot + self.duration,
185 duration: self.duration,
186 authorities: descriptor.authorities,
187 randomness: descriptor.randomness,
188 config,
189 }
190 .into()
191 }
192
193 fn start_slot(&self) -> Slot {
194 self.start_slot
195 }
196
197 fn end_slot(&self) -> Slot {
198 self.start_slot + self.duration
199 }
200}
201
202impl Epoch {
203 pub fn genesis(genesis_config: &BabeConfiguration, slot: Slot) -> Epoch {
207 subsoil::consensus::babe::Epoch {
208 epoch_index: 0,
209 start_slot: slot,
210 duration: genesis_config.epoch_length,
211 authorities: genesis_config.authorities.clone(),
212 randomness: genesis_config.randomness,
213 config: BabeEpochConfiguration {
214 c: genesis_config.c,
215 allowed_slots: genesis_config.allowed_slots,
216 },
217 }
218 .into()
219 }
220
221 pub fn clone_for_slot(&self, slot: Slot) -> Epoch {
229 let mut epoch = self.clone();
230
231 let skipped_epochs = *slot.saturating_sub(self.start_slot) / self.duration;
232
233 let epoch_index = epoch.epoch_index.checked_add(skipped_epochs).expect(
234 "epoch number is u64; it should be strictly smaller than number of slots; \
235 slots relate in some way to wall clock time; \
236 if u64 is not enough we should crash for safety; qed.",
237 );
238
239 let start_slot = skipped_epochs
240 .checked_mul(epoch.duration)
241 .and_then(|skipped_slots| epoch.start_slot.checked_add(skipped_slots))
242 .expect(
243 "slot number is u64; it should relate in some way to wall clock time; \
244 if u64 is not enough we should crash for safety; qed.",
245 );
246
247 epoch.epoch_index = epoch_index;
248 epoch.start_slot = Slot::from(start_slot);
249
250 epoch
251 }
252}
253
254#[derive(Debug, thiserror::Error)]
256pub enum Error<B: BlockT> {
257 #[error("Multiple BABE pre-runtime digests, rejecting!")]
259 MultiplePreRuntimeDigests,
260 #[error("No BABE pre-runtime digest found")]
262 NoPreRuntimeDigest,
263 #[error("Multiple BABE epoch change digests, rejecting!")]
265 MultipleEpochChangeDigests,
266 #[error("Multiple BABE config change digests, rejecting!")]
268 MultipleConfigChangeDigests,
269 #[error("Could not extract timestamp and slot: {0}")]
271 Extraction(ConsensusError),
272 #[error("Could not fetch epoch at {0:?}")]
274 FetchEpoch(B::Hash),
275 #[error("Header {0:?} rejected: too far in the future")]
277 TooFarInFuture(B::Hash),
278 #[error("Parent ({0}) of {1} unavailable. Cannot import")]
280 ParentUnavailable(B::Hash, B::Hash),
281 #[error("Slot number must increase: parent slot: {0}, this slot: {1}")]
283 SlotMustIncrease(Slot, Slot),
284 #[error("Header {0:?} has a bad seal")]
286 HeaderBadSeal(B::Hash),
287 #[error("Header {0:?} is unsealed")]
289 HeaderUnsealed(B::Hash),
290 #[error("Slot author not found")]
292 SlotAuthorNotFound,
293 #[error("Secondary slot assignments are disabled for the current epoch.")]
295 SecondarySlotAssignmentsDisabled,
296 #[error("Bad signature on {0:?}")]
298 BadSignature(B::Hash),
299 #[error("Invalid author: Expected secondary author: {0:?}, got: {1:?}.")]
301 InvalidAuthor(AuthorityId, AuthorityId),
302 #[error("No secondary author expected.")]
304 NoSecondaryAuthorExpected,
305 #[error("VRF verification failed")]
307 VrfVerificationFailed,
308 #[error("VRF output rejected, threshold {0} exceeded")]
310 VrfThresholdExceeded(u128),
311 #[error("Could not fetch parent header: {0}")]
313 FetchParentHeader(soil_client::blockchain::Error),
314 #[error("Expected epoch change to happen at {0:?}, s{1}")]
316 ExpectedEpochChange(B::Hash, Slot),
317 #[error("Unexpected config change")]
319 UnexpectedConfigChange,
320 #[error("Unexpected epoch change")]
322 UnexpectedEpochChange,
323 #[error("Parent block of {0} has no associated weight")]
325 ParentBlockNoAssociatedWeight(B::Hash),
326 #[error("Checking inherents failed: {0}")]
328 CheckInherents(subsoil::inherents::Error),
329 #[error("Checking inherents unhandled error: {}", String::from_utf8_lossy(.0))]
331 CheckInherentsUnhandled(subsoil::inherents::InherentIdentifier),
332 #[error("Creating inherents failed: {0}")]
334 CreateInherents(subsoil::inherents::Error),
335 #[error("Background worker is not running")]
337 BackgroundWorkerTerminated,
338 #[error(transparent)]
340 Client(soil_client::blockchain::Error),
341 #[error(transparent)]
343 RuntimeApi(subsoil::api::ApiError),
344 #[error(transparent)]
346 ForkTree(Box<soil_fork_tree::Error<soil_client::blockchain::Error>>),
347}
348
349impl<B: BlockT> From<Error<B>> for String {
350 fn from(error: Error<B>) -> String {
351 error.to_string()
352 }
353}
354
355fn babe_err<B: BlockT>(error: Error<B>) -> Error<B> {
356 debug!(target: LOG_TARGET, "{}", error);
357 error
358}
359
360pub struct BabeIntermediate<B: BlockT> {
362 pub epoch_descriptor: ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
364}
365
366pub static INTERMEDIATE_KEY: &[u8] = b"babe1";
368
369pub fn configuration<B: BlockT, C>(client: &C) -> ClientResult<BabeConfiguration>
371where
372 C: AuxStore + ProvideRuntimeApi<B> + UsageProvider<B>,
373 C::Api: BabeApi<B>,
374{
375 let at_hash = if client.usage_info().chain.finalized_state.is_some() {
376 client.usage_info().chain.best_hash
377 } else {
378 debug!(target: LOG_TARGET, "No finalized state is available. Reading config from genesis");
379 client.usage_info().chain.genesis_hash
380 };
381
382 let runtime_api = client.runtime_api();
383 let version = runtime_api.api_version::<dyn BabeApi<B>>(at_hash)?;
384
385 let config = match version {
386 Some(1) => {
387 #[allow(deprecated)]
388 {
389 runtime_api.configuration_before_version_2(at_hash)?.into()
390 }
391 },
392 Some(2) => runtime_api.configuration(at_hash)?,
393 _ => {
394 return Err(soil_client::blockchain::Error::VersionInvalid(
395 "Unsupported or invalid BabeApi version".to_string(),
396 ))
397 },
398 };
399 Ok(config)
400}
401
402pub struct BabeParams<B: BlockT, C, SC, E, I, SO, L, CIDP, BS> {
404 pub keystore: KeystorePtr,
406
407 pub client: Arc<C>,
409
410 pub select_chain: SC,
412
413 pub env: E,
415
416 pub block_import: I,
420
421 pub sync_oracle: SO,
423
424 pub justification_sync_link: L,
426
427 pub create_inherent_data_providers: CIDP,
429
430 pub force_authoring: bool,
432
433 pub backoff_authoring_blocks: Option<BS>,
435
436 pub babe_link: BabeLink<B>,
438
439 pub block_proposal_slot_portion: SlotProportion,
445
446 pub max_block_proposal_slot_portion: Option<SlotProportion>,
449
450 pub telemetry: Option<TelemetryHandle>,
452}
453
454pub fn start_babe<B, C, SC, E, I, SO, CIDP, BS, L, Error>(
456 BabeParams {
457 keystore,
458 client,
459 select_chain,
460 env,
461 block_import,
462 sync_oracle,
463 justification_sync_link,
464 create_inherent_data_providers,
465 force_authoring,
466 backoff_authoring_blocks,
467 babe_link,
468 block_proposal_slot_portion,
469 max_block_proposal_slot_portion,
470 telemetry,
471 }: BabeParams<B, C, SC, E, I, SO, L, CIDP, BS>,
472) -> Result<BabeWorker<B>, ConsensusError>
473where
474 B: BlockT,
475 C: ProvideRuntimeApi<B>
476 + HeaderBackend<B>
477 + HeaderMetadata<B, Error = ClientError>
478 + Send
479 + Sync
480 + 'static,
481 C::Api: BabeApi<B>,
482 SC: SelectChain<B> + 'static,
483 E: Environment<B, Error = Error> + Send + Sync + 'static,
484 E::Proposer: Proposer<B, Error = Error>,
485 I: BlockImport<B, Error = ConsensusError> + Send + Sync + 'static,
486 SO: SyncOracle + Send + Sync + Clone + 'static,
487 L: soil_client::import::JustificationSyncLink<B> + 'static,
488 CIDP: CreateInherentDataProviders<B, ()> + Send + Sync + 'static,
489 CIDP::InherentDataProviders: InherentDataProviderExt + Send,
490 BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync + 'static,
491 Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
492{
493 let slot_notification_sinks = Arc::new(Mutex::new(Vec::new()));
494
495 let worker = BabeSlotWorker {
496 client: client.clone(),
497 block_import,
498 env,
499 sync_oracle: sync_oracle.clone(),
500 justification_sync_link,
501 force_authoring,
502 backoff_authoring_blocks,
503 keystore,
504 epoch_changes: babe_link.epoch_changes.clone(),
505 slot_notification_sinks: slot_notification_sinks.clone(),
506 config: babe_link.config.clone(),
507 block_proposal_slot_portion,
508 max_block_proposal_slot_portion,
509 telemetry,
510 };
511
512 info!(target: LOG_TARGET, "👶 Starting BABE Authorship worker");
513
514 let slot_worker = soil_consensus::slots::start_slot_worker(
515 babe_link.config.slot_duration(),
516 select_chain,
517 soil_consensus::slots::SimpleSlotWorkerToSlotWorker(worker),
518 sync_oracle,
519 create_inherent_data_providers,
520 );
521
522 Ok(BabeWorker { inner: Box::pin(slot_worker), slot_notification_sinks })
523}
524
525fn aux_storage_cleanup<C: HeaderMetadata<Block> + HeaderBackend<Block>, Block: BlockT>(
529 client: &C,
530 notification: &FinalityNotification<Block>,
531) -> AuxDataOperations {
532 let mut hashes = HashSet::new();
533
534 let first = notification.tree_route.first().unwrap_or(¬ification.hash);
535 match client.header_metadata(*first) {
536 Ok(meta) => {
537 hashes.insert(meta.parent);
538 },
539 Err(err) => {
540 warn!(target: LOG_TARGET, "Failed to lookup metadata for block `{:?}`: {}", first, err,)
541 },
542 }
543
544 hashes.extend(
546 notification
547 .tree_route
548 .iter()
549 .filter(|h| **h != notification.hash),
552 );
553
554 hashes.extend(notification.stale_blocks.iter().map(|b| b.hash));
555
556 hashes
557 .into_iter()
558 .map(|val| (aux_schema::block_weight_key(val), None))
559 .collect()
560}
561
562async fn answer_requests<B: BlockT, C>(
563 mut request_rx: Receiver<BabeRequest<B>>,
564 config: BabeConfiguration,
565 client: Arc<C>,
566 epoch_changes: SharedEpochChanges<B, Epoch>,
567) where
568 C: HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
569{
570 while let Some(request) = request_rx.next().await {
571 match request {
572 BabeRequest::EpochData(response) => {
573 let _ = response.send(epoch_changes.shared_data().clone());
574 },
575 BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, response) => {
576 let lookup = || {
577 let epoch_changes = epoch_changes.shared_data();
578 epoch_changes
579 .epoch_data_for_child_of(
580 descendent_query(&*client),
581 &parent_hash,
582 parent_number,
583 slot,
584 |slot| Epoch::genesis(&config, slot),
585 )
586 .map_err(|e| Error::<B>::ForkTree(Box::new(e)))?
587 .ok_or(Error::<B>::FetchEpoch(parent_hash))
588 };
589
590 let _ = response.send(lookup());
591 },
592 }
593 }
594}
595
596enum BabeRequest<B: BlockT> {
598 EpochData(oneshot::Sender<EpochChangesFor<B, Epoch>>),
600 EpochDataForChildOf(B::Hash, NumberFor<B>, Slot, oneshot::Sender<Result<Epoch, Error<B>>>),
604}
605
606#[derive(Clone)]
608pub struct BabeWorkerHandle<B: BlockT>(Sender<BabeRequest<B>>);
609
610impl<B: BlockT> BabeWorkerHandle<B> {
611 async fn send_request(&self, request: BabeRequest<B>) -> Result<(), Error<B>> {
612 match self.0.clone().send(request).await {
613 Err(err) if err.is_disconnected() => return Err(Error::BackgroundWorkerTerminated),
614 Err(err) => warn!(
615 target: LOG_TARGET,
616 "Unhandled error when sending request to worker: {:?}", err
617 ),
618 _ => {},
619 }
620
621 Ok(())
622 }
623
624 pub async fn epoch_data(&self) -> Result<EpochChangesFor<B, Epoch>, Error<B>> {
626 let (tx, rx) = oneshot::channel();
627 self.send_request(BabeRequest::EpochData(tx)).await?;
628
629 rx.await.or(Err(Error::BackgroundWorkerTerminated))
630 }
631
632 pub async fn epoch_data_for_child_of(
636 &self,
637 parent_hash: B::Hash,
638 parent_number: NumberFor<B>,
639 slot: Slot,
640 ) -> Result<Epoch, Error<B>> {
641 let (tx, rx) = oneshot::channel();
642 self.send_request(BabeRequest::EpochDataForChildOf(parent_hash, parent_number, slot, tx))
643 .await?;
644
645 rx.await.or(Err(Error::BackgroundWorkerTerminated))?
646 }
647}
648
649#[must_use]
651pub struct BabeWorker<B: BlockT> {
652 inner: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
653 slot_notification_sinks: SlotNotificationSinks<B>,
654}
655
656impl<B: BlockT> BabeWorker<B> {
657 pub fn slot_notification_stream(
660 &self,
661 ) -> Receiver<(Slot, ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>)> {
662 const CHANNEL_BUFFER_SIZE: usize = 1024;
663
664 let (sink, stream) = channel(CHANNEL_BUFFER_SIZE);
665 self.slot_notification_sinks.lock().push(sink);
666 stream
667 }
668}
669
670impl<B: BlockT> Future for BabeWorker<B> {
671 type Output = ();
672
673 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
674 self.inner.as_mut().poll(cx)
675 }
676}
677
678type SlotNotificationSinks<B> = Arc<
680 Mutex<Vec<Sender<(Slot, ViableEpochDescriptor<<B as BlockT>::Hash, NumberFor<B>, Epoch>)>>>,
681>;
682
683struct BabeSlotWorker<B: BlockT, C, E, I, SO, L, BS> {
684 client: Arc<C>,
685 block_import: I,
686 env: E,
687 sync_oracle: SO,
688 justification_sync_link: L,
689 force_authoring: bool,
690 backoff_authoring_blocks: Option<BS>,
691 keystore: KeystorePtr,
692 epoch_changes: SharedEpochChanges<B, Epoch>,
693 slot_notification_sinks: SlotNotificationSinks<B>,
694 config: BabeConfiguration,
695 block_proposal_slot_portion: SlotProportion,
696 max_block_proposal_slot_portion: Option<SlotProportion>,
697 telemetry: Option<TelemetryHandle>,
698}
699
700#[async_trait::async_trait]
701impl<B, C, E, I, Error, SO, L, BS> soil_consensus::slots::SimpleSlotWorker<B>
702 for BabeSlotWorker<B, C, E, I, SO, L, BS>
703where
704 B: BlockT,
705 C: ProvideRuntimeApi<B> + HeaderBackend<B> + HeaderMetadata<B, Error = ClientError>,
706 C::Api: BabeApi<B>,
707 E: Environment<B, Error = Error> + Send + Sync,
708 E::Proposer: Proposer<B, Error = Error>,
709 I: BlockImport<B> + Send + Sync + 'static,
710 SO: SyncOracle + Send + Clone + Sync,
711 L: soil_client::import::JustificationSyncLink<B>,
712 BS: BackoffAuthoringBlocksStrategy<NumberFor<B>> + Send + Sync,
713 Error: std::error::Error + Send + From<ConsensusError> + From<I::Error> + 'static,
714{
715 type Claim = (PreDigest, AuthorityId);
716 type SyncOracle = SO;
717 type JustificationSyncLink = L;
718 type CreateProposer =
719 Pin<Box<dyn Future<Output = Result<E::Proposer, ConsensusError>> + Send + 'static>>;
720 type Proposer = E::Proposer;
721 type BlockImport = I;
722 type AuxData = ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>;
723
724 fn logging_target(&self) -> &'static str {
725 LOG_TARGET
726 }
727
728 fn block_import(&mut self) -> &mut Self::BlockImport {
729 &mut self.block_import
730 }
731
732 fn aux_data(&self, parent: &B::Header, slot: Slot) -> Result<Self::AuxData, ConsensusError> {
733 self.epoch_changes
734 .shared_data()
735 .epoch_descriptor_for_child_of(
736 descendent_query(&*self.client),
737 &parent.hash(),
738 *parent.number(),
739 slot,
740 )
741 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
742 .ok_or(ConsensusError::InvalidAuthoritiesSet)
743 }
744
745 fn authorities_len(&self, epoch_descriptor: &Self::AuxData) -> Option<usize> {
746 self.epoch_changes
747 .shared_data()
748 .viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
749 .map(|epoch| epoch.as_ref().authorities.len())
750 }
751
752 async fn claim_slot(
753 &mut self,
754 _parent_header: &B::Header,
755 slot: Slot,
756 epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
757 ) -> Option<Self::Claim> {
758 debug!(target: LOG_TARGET, "Attempting to claim slot {}", slot);
759 let s = authorship::claim_slot(
760 slot,
761 self.epoch_changes
762 .shared_data()
763 .viable_epoch(epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))?
764 .as_ref(),
765 &self.keystore,
766 );
767
768 if s.is_some() {
769 debug!(target: LOG_TARGET, "Claimed slot {}", slot);
770 }
771
772 s
773 }
774
775 fn notify_slot(
776 &self,
777 _parent_header: &B::Header,
778 slot: Slot,
779 epoch_descriptor: &ViableEpochDescriptor<B::Hash, NumberFor<B>, Epoch>,
780 ) {
781 let sinks = &mut self.slot_notification_sinks.lock();
782 sinks.retain_mut(|sink| match sink.try_send((slot, epoch_descriptor.clone())) {
783 Ok(()) => true,
784 Err(e) => {
785 if e.is_full() {
786 warn!(target: LOG_TARGET, "Trying to notify a slot but the channel is full");
787 true
788 } else {
789 false
790 }
791 },
792 });
793 }
794
795 fn pre_digest_data(
796 &self,
797 _slot: Slot,
798 claim: &Self::Claim,
799 ) -> Vec<subsoil::runtime::DigestItem> {
800 vec![<DigestItem as CompatibleDigestItem>::babe_pre_digest(claim.0.clone())]
801 }
802
803 async fn block_import_params(
804 &self,
805 header: B::Header,
806 header_hash: &B::Hash,
807 body: Vec<B::Extrinsic>,
808 storage_changes: StorageChanges<B>,
809 (_, public): Self::Claim,
810 epoch_descriptor: Self::AuxData,
811 ) -> Result<BlockImportParams<B>, ConsensusError> {
812 let signature = self
813 .keystore
814 .sr25519_sign(<AuthorityId as AppCrypto>::ID, public.as_ref(), header_hash.as_ref())
815 .map_err(|e| ConsensusError::CannotSign(format!("{}. Key: {:?}", e, public)))?
816 .ok_or_else(|| {
817 ConsensusError::CannotSign(format!(
818 "Could not find key in keystore. Key: {:?}",
819 public
820 ))
821 })?;
822
823 let digest_item = <DigestItem as CompatibleDigestItem>::babe_seal(signature.into());
824
825 let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
826 import_block.post_digests.push(digest_item);
827 import_block.body = Some(body);
828 import_block.state_action = StateAction::ApplyChanges(
829 soil_client::import::StorageChanges::Changes(storage_changes),
830 );
831 import_block
832 .insert_intermediate(INTERMEDIATE_KEY, BabeIntermediate::<B> { epoch_descriptor });
833
834 Ok(import_block)
835 }
836
837 fn force_authoring(&self) -> bool {
838 self.force_authoring
839 }
840
841 fn should_backoff(&self, slot: Slot, chain_head: &B::Header) -> bool {
842 if let Some(ref strategy) = self.backoff_authoring_blocks {
843 if let Ok(chain_head_slot) =
844 find_pre_digest::<B>(chain_head).map(|digest| digest.slot())
845 {
846 return strategy.should_backoff(
847 *chain_head.number(),
848 chain_head_slot,
849 self.client.info().finalized_number,
850 slot,
851 self.logging_target(),
852 );
853 }
854 }
855 false
856 }
857
858 fn sync_oracle(&mut self) -> &mut Self::SyncOracle {
859 &mut self.sync_oracle
860 }
861
862 fn justification_sync_link(&mut self) -> &mut Self::JustificationSyncLink {
863 &mut self.justification_sync_link
864 }
865
866 fn proposer(&mut self, block: &B::Header) -> Self::CreateProposer {
867 Box::pin(self.env.init(block).map_err(|e| ConsensusError::ClientImport(e.to_string())))
868 }
869
870 fn telemetry(&self) -> Option<TelemetryHandle> {
871 self.telemetry.clone()
872 }
873
874 fn proposing_remaining_duration(&self, slot_info: &SlotInfo<B>) -> Duration {
875 let parent_slot = find_pre_digest::<B>(&slot_info.chain_head).ok().map(|d| d.slot());
876
877 soil_consensus::slots::proposing_remaining_duration(
878 parent_slot,
879 slot_info,
880 &self.block_proposal_slot_portion,
881 self.max_block_proposal_slot_portion.as_ref(),
882 soil_consensus::slots::SlotLenienceType::Exponential,
883 self.logging_target(),
884 )
885 }
886}
887
888pub fn find_pre_digest<B: BlockT>(header: &B::Header) -> Result<PreDigest, Error<B>> {
891 if header.number().is_zero() {
894 return Ok(PreDigest::SecondaryPlain(SecondaryPlainPreDigest {
895 slot: 0.into(),
896 authority_index: 0,
897 }));
898 }
899
900 let mut pre_digest: Option<_> = None;
901 for log in header.digest().logs() {
902 trace!(target: LOG_TARGET, "Checking log {:?}, looking for pre runtime digest", log);
903 match (log.as_babe_pre_digest(), pre_digest.is_some()) {
904 (Some(_), true) => return Err(babe_err(Error::MultiplePreRuntimeDigests)),
905 (None, _) => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
906 (s, false) => pre_digest = s,
907 }
908 }
909 pre_digest.ok_or_else(|| babe_err(Error::NoPreRuntimeDigest))
910}
911
912pub fn contains_epoch_change<B: BlockT>(header: &B::Header) -> bool {
914 find_next_epoch_digest::<B>(header).ok().flatten().is_some()
915}
916
917pub fn find_next_epoch_digest<B: BlockT>(
919 header: &B::Header,
920) -> Result<Option<NextEpochDescriptor>, Error<B>> {
921 let mut epoch_digest: Option<_> = None;
922 for log in header.digest().logs() {
923 trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
924 let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
925 match (log, epoch_digest.is_some()) {
926 (Some(ConsensusLog::NextEpochData(_)), true) => {
927 return Err(babe_err(Error::MultipleEpochChangeDigests))
928 },
929 (Some(ConsensusLog::NextEpochData(epoch)), false) => epoch_digest = Some(epoch),
930 _ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
931 }
932 }
933
934 Ok(epoch_digest)
935}
936
937fn find_next_config_digest<B: BlockT>(
939 header: &B::Header,
940) -> Result<Option<NextConfigDescriptor>, Error<B>> {
941 let mut config_digest: Option<_> = None;
942 for log in header.digest().logs() {
943 trace!(target: LOG_TARGET, "Checking log {:?}, looking for epoch change digest.", log);
944 let log = log.try_to::<ConsensusLog>(OpaqueDigestItemId::Consensus(&BABE_ENGINE_ID));
945 match (log, config_digest.is_some()) {
946 (Some(ConsensusLog::NextConfigData(_)), true) => {
947 return Err(babe_err(Error::MultipleConfigChangeDigests))
948 },
949 (Some(ConsensusLog::NextConfigData(config)), false) => config_digest = Some(config),
950 _ => trace!(target: LOG_TARGET, "Ignoring digest not meant for us"),
951 }
952 }
953
954 Ok(config_digest)
955}
956
957#[derive(Clone)]
959pub struct BabeLink<Block: BlockT> {
960 epoch_changes: SharedEpochChanges<Block, Epoch>,
961 config: BabeConfiguration,
962}
963
964impl<Block: BlockT> BabeLink<Block> {
965 pub fn epoch_changes(&self) -> &SharedEpochChanges<Block, Epoch> {
967 &self.epoch_changes
968 }
969
970 pub fn config(&self) -> &BabeConfiguration {
972 &self.config
973 }
974}
975
976pub struct BabeVerifier<Block: BlockT, Client> {
978 client: Arc<Client>,
979 slot_duration: SlotDuration,
980 config: BabeConfiguration,
981 epoch_changes: SharedEpochChanges<Block, Epoch>,
982 telemetry: Option<TelemetryHandle>,
983}
984
985#[async_trait::async_trait]
986impl<Block, Client> Verifier<Block> for BabeVerifier<Block, Client>
987where
988 Block: BlockT,
989 Client: HeaderMetadata<Block, Error = soil_client::blockchain::Error>
990 + HeaderBackend<Block>
991 + ProvideRuntimeApi<Block>
992 + Send
993 + Sync
994 + AuxStore,
995 Client::Api: BlockBuilderApi<Block> + BabeApi<Block>,
996{
997 async fn verify(
998 &self,
999 mut block: BlockImportParams<Block>,
1000 ) -> Result<BlockImportParams<Block>, String> {
1001 trace!(
1002 target: LOG_TARGET,
1003 "Verifying origin: {:?} header: {:?} justification(s): {:?} body: {:?}",
1004 block.origin,
1005 block.header,
1006 block.justifications,
1007 block.body,
1008 );
1009
1010 let hash = block.header.hash();
1011 let parent_hash = *block.header.parent_hash();
1012
1013 let number = block.header.number();
1014
1015 if should_skip_verification(&*self.client, &block) {
1016 return Ok(block);
1017 }
1018
1019 debug!(
1020 target: LOG_TARGET,
1021 "We have {:?} logs in this header",
1022 block.header.digest().logs().len()
1023 );
1024
1025 let slot_now = Slot::from_timestamp(Timestamp::current(), self.slot_duration);
1026
1027 let pre_digest = find_pre_digest::<Block>(&block.header)?;
1028 let (check_header, epoch_descriptor) = {
1029 let (epoch_descriptor, viable_epoch) = query_epoch_changes(
1030 &self.epoch_changes,
1031 self.client.as_ref(),
1032 &self.config,
1033 *number,
1034 pre_digest.slot(),
1035 parent_hash,
1036 )?;
1037
1038 let v_params = verification::VerificationParams {
1041 header: block.header.clone(),
1042 pre_digest: Some(pre_digest),
1043 slot_now: slot_now + 1,
1044 epoch: viable_epoch.as_ref(),
1045 };
1046
1047 (verification::check_header::<Block>(v_params)?, epoch_descriptor)
1048 };
1049
1050 match check_header {
1051 CheckedHeader::Checked(pre_header, verified_info) => {
1052 trace!(target: LOG_TARGET, "Checked {:?}; importing.", pre_header);
1053 telemetry!(
1054 self.telemetry;
1055 CONSENSUS_TRACE;
1056 "babe.checked_and_importing";
1057 "pre_header" => ?pre_header,
1058 );
1059
1060 block.header = pre_header;
1061 block.post_digests.push(verified_info.seal);
1062 block.insert_intermediate(
1063 INTERMEDIATE_KEY,
1064 BabeIntermediate::<Block> { epoch_descriptor },
1065 );
1066 block.post_hash = Some(hash);
1067
1068 Ok(block)
1069 },
1070 CheckedHeader::Deferred(a, b) => {
1071 debug!(target: LOG_TARGET, "Checking {:?} failed; {:?}, {:?}.", hash, a, b);
1072 telemetry!(
1073 self.telemetry;
1074 CONSENSUS_DEBUG;
1075 "babe.header_too_far_in_future";
1076 "hash" => ?hash, "a" => ?a, "b" => ?b
1077 );
1078 Err(Error::<Block>::TooFarInFuture(hash).into())
1079 },
1080 }
1081 }
1082}
1083
1084fn should_skip_verification<B: BlockT>(
1091 client: &impl HeaderBackend<B>,
1092 block: &BlockImportParams<B>,
1093) -> bool {
1094 block.origin == BlockOrigin::WarpSync || block.with_state() || {
1095 let number = *block.header.number();
1096 let info = client.info();
1097 info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end)
1098 }
1099}
1100
1101pub struct BabeBlockImport<Block: BlockT, Client, I, CIDP, SC> {
1110 inner: I,
1111 client: Arc<Client>,
1112 epoch_changes: SharedEpochChanges<Block, Epoch>,
1113 create_inherent_data_providers: CIDP,
1114 config: BabeConfiguration,
1115 select_chain: SC,
1120 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1124}
1125
1126impl<Block: BlockT, I: Clone, Client, CIDP: Clone, SC: Clone> Clone
1127 for BabeBlockImport<Block, Client, I, CIDP, SC>
1128{
1129 fn clone(&self) -> Self {
1130 BabeBlockImport {
1131 inner: self.inner.clone(),
1132 client: self.client.clone(),
1133 epoch_changes: self.epoch_changes.clone(),
1134 config: self.config.clone(),
1135 create_inherent_data_providers: self.create_inherent_data_providers.clone(),
1136 select_chain: self.select_chain.clone(),
1137 offchain_tx_pool_factory: self.offchain_tx_pool_factory.clone(),
1138 }
1139 }
1140}
1141
1142impl<Block: BlockT, Client, I, CIDP, SC> BabeBlockImport<Block, Client, I, CIDP, SC> {
1143 fn new(
1144 client: Arc<Client>,
1145 epoch_changes: SharedEpochChanges<Block, Epoch>,
1146 block_import: I,
1147 config: BabeConfiguration,
1148 create_inherent_data_providers: CIDP,
1149 select_chain: SC,
1150 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1151 ) -> Self {
1152 BabeBlockImport {
1153 client,
1154 inner: block_import,
1155 epoch_changes,
1156 config,
1157 create_inherent_data_providers,
1158 select_chain,
1159 offchain_tx_pool_factory,
1160 }
1161 }
1162}
1163
1164impl<Block, Client, Inner, CIDP, SC> BabeBlockImport<Block, Client, Inner, CIDP, SC>
1165where
1166 Block: BlockT,
1167 Inner: BlockImport<Block> + Send + Sync,
1168 Inner::Error: Into<ConsensusError>,
1169 Client: HeaderBackend<Block>
1170 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1171 + AuxStore
1172 + ProvideRuntimeApi<Block>
1173 + Send
1174 + Sync,
1175 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1176 CIDP: CreateInherentDataProviders<Block, ()>,
1177 CIDP::InherentDataProviders: InherentDataProviderExt + Send,
1178 SC: soil_client::consensus::SelectChain<Block> + 'static,
1179{
1180 async fn import_state(
1184 &self,
1185 mut block: BlockImportParams<Block>,
1186 ) -> Result<ImportResult, ConsensusError> {
1187 let hash = block.post_hash();
1188 let parent_hash = *block.header.parent_hash();
1189 let number = *block.header.number();
1190
1191 block.fork_choice = Some(ForkChoiceStrategy::Custom(true));
1192 aux_schema::write_block_weight(hash, 0, |values| {
1194 block
1195 .auxiliary
1196 .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1197 });
1198
1199 let import_result = self.inner.import_block(block).await;
1201 let aux = match import_result {
1202 Ok(ImportResult::Imported(aux)) => aux,
1203 Ok(r) => {
1204 return Err(ConsensusError::ClientImport(format!(
1205 "Unexpected import result: {:?}",
1206 r
1207 )))
1208 },
1209 Err(r) => return Err(r.into()),
1210 };
1211
1212 let current_epoch = self.client.runtime_api().current_epoch(hash).map_err(|e| {
1214 ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1215 })?;
1216 let next_epoch = self.client.runtime_api().next_epoch(hash).map_err(|e| {
1217 ConsensusError::ClientImport(babe_err::<Block>(Error::RuntimeApi(e)).into())
1218 })?;
1219
1220 let mut epoch_changes = self.epoch_changes.shared_data_locked();
1221 epoch_changes.reset(parent_hash, hash, number, current_epoch.into(), next_epoch.into());
1222 aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1223 self.client.insert_aux(insert, [])
1224 })
1225 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1226
1227 Ok(ImportResult::Imported(aux))
1228 }
1229
1230 async fn check_inherents_and_equivocations(
1232 &self,
1233 block: &mut BlockImportParams<Block>,
1234 ) -> Result<(), ConsensusError> {
1235 if should_skip_verification(&*self.client, block) {
1236 return Ok(());
1237 }
1238
1239 let parent_hash = *block.header.parent_hash();
1240 let number = *block.header.number();
1241
1242 let create_inherent_data_providers = self
1243 .create_inherent_data_providers
1244 .create_inherent_data_providers(parent_hash, ())
1245 .await?;
1246
1247 let slot_now = create_inherent_data_providers.slot();
1248
1249 let babe_pre_digest = find_pre_digest::<Block>(&block.header)
1250 .map_err(|e| ConsensusError::Other(Box::new(e)))?;
1251 let slot = babe_pre_digest.slot();
1252
1253 self.check_inherents(block, parent_hash, slot, create_inherent_data_providers)
1255 .await?;
1256
1257 let author = {
1259 let viable_epoch = query_epoch_changes(
1260 &self.epoch_changes,
1261 self.client.as_ref(),
1262 &self.config,
1263 number,
1264 slot,
1265 parent_hash,
1266 )
1267 .map_err(|e| ConsensusError::Other(babe_err(e).into()))?
1268 .1;
1269 match viable_epoch
1270 .as_ref()
1271 .authorities
1272 .get(babe_pre_digest.authority_index() as usize)
1273 {
1274 Some(author) => author.0.clone(),
1275 None => {
1276 return Err(ConsensusError::Other(Error::<Block>::SlotAuthorNotFound.into()))
1277 },
1278 }
1279 };
1280 if let Err(err) = self
1281 .check_and_report_equivocation(slot_now, slot, &block.header, &author, &block.origin)
1282 .await
1283 {
1284 warn!(
1285 target: LOG_TARGET,
1286 "Error checking/reporting BABE equivocation: {}", err
1287 );
1288 }
1289 Ok(())
1290 }
1291
1292 async fn check_inherents(
1293 &self,
1294 block: &mut BlockImportParams<Block>,
1295 at_hash: Block::Hash,
1296 slot: Slot,
1297 create_inherent_data_providers: CIDP::InherentDataProviders,
1298 ) -> Result<(), ConsensusError> {
1299 if block.state_action.skip_execution_checks() {
1300 return Ok(());
1301 }
1302
1303 if let Some(inner_body) = block.body.take() {
1304 let new_block = Block::new(block.header.clone(), inner_body);
1305 let mut inherent_data = create_inherent_data_providers
1309 .create_inherent_data()
1310 .await
1311 .map_err(|e| ConsensusError::Other(Box::new(e)))?;
1312 inherent_data.babe_replace_inherent_data(slot);
1313
1314 use subsoil::block_builder::CheckInherentsError;
1315
1316 subsoil::block_builder::check_inherents_with_data(
1317 self.client.clone(),
1318 at_hash,
1319 new_block.clone(),
1320 &create_inherent_data_providers,
1321 inherent_data,
1322 )
1323 .await
1324 .map_err(|e| {
1325 ConsensusError::Other(Box::new(match e {
1326 CheckInherentsError::CreateInherentData(e) => {
1327 Error::<Block>::CreateInherents(e)
1328 },
1329 CheckInherentsError::Client(e) => Error::RuntimeApi(e),
1330 CheckInherentsError::CheckInherents(e) => Error::CheckInherents(e),
1331 CheckInherentsError::CheckInherentsUnknownError(id) => {
1332 Error::CheckInherentsUnhandled(id)
1333 },
1334 }))
1335 })?;
1336 let (_, inner_body) = new_block.deconstruct();
1337 block.body = Some(inner_body);
1338 }
1339
1340 Ok(())
1341 }
1342
1343 async fn check_and_report_equivocation(
1344 &self,
1345 slot_now: Slot,
1346 slot: Slot,
1347 header: &Block::Header,
1348 author: &AuthorityId,
1349 origin: &BlockOrigin,
1350 ) -> Result<(), Error<Block>> {
1351 if *origin == BlockOrigin::NetworkInitialSync {
1354 return Ok(());
1355 }
1356
1357 let Some(equivocation_proof) =
1359 check_equivocation(&*self.client, slot_now, slot, header, author)
1360 .map_err(Error::Client)?
1361 else {
1362 return Ok(());
1363 };
1364
1365 info!(
1366 target: LOG_TARGET,
1367 "Slot author {:?} is equivocating at slot {} with headers {:?} and {:?}",
1368 author,
1369 slot,
1370 equivocation_proof.first_header.hash(),
1371 equivocation_proof.second_header.hash(),
1372 );
1373
1374 let best_hash = self
1376 .select_chain
1377 .best_chain()
1378 .await
1379 .map(|h| h.hash())
1380 .map_err(|e| Error::Client(e.into()))?;
1381
1382 let generate_key_owner_proof = |at_hash: Block::Hash| {
1391 self.client
1392 .runtime_api()
1393 .generate_key_ownership_proof(at_hash, slot, equivocation_proof.offender.clone())
1394 .map_err(Error::RuntimeApi)
1395 };
1396
1397 let parent_hash = *header.parent_hash();
1398 let key_owner_proof = match generate_key_owner_proof(parent_hash)? {
1399 Some(proof) => proof,
1400 None => match generate_key_owner_proof(best_hash)? {
1401 Some(proof) => proof,
1402 None => {
1403 debug!(
1404 target: LOG_TARGET,
1405 "Equivocation offender is not part of the authority set."
1406 );
1407 return Ok(());
1408 },
1409 },
1410 };
1411
1412 let mut runtime_api = self.client.runtime_api();
1414
1415 runtime_api
1417 .register_extension(self.offchain_tx_pool_factory.offchain_transaction_pool(best_hash));
1418
1419 runtime_api
1420 .submit_report_equivocation_unsigned_extrinsic(
1421 best_hash,
1422 equivocation_proof,
1423 key_owner_proof,
1424 )
1425 .map_err(Error::RuntimeApi)?;
1426
1427 info!(target: LOG_TARGET, "Submitted equivocation report for author {:?}", author);
1428
1429 Ok(())
1430 }
1431}
1432
1433#[async_trait::async_trait]
1434impl<Block, Client, Inner, CIDP, SC> BlockImport<Block>
1435 for BabeBlockImport<Block, Client, Inner, CIDP, SC>
1436where
1437 Block: BlockT,
1438 Inner: BlockImport<Block> + Send + Sync,
1439 Inner::Error: Into<ConsensusError>,
1440 Client: HeaderBackend<Block>
1441 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1442 + AuxStore
1443 + ProvideRuntimeApi<Block>
1444 + Send
1445 + Sync,
1446 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1447 CIDP: CreateInherentDataProviders<Block, ()>,
1448 CIDP::InherentDataProviders: InherentDataProviderExt + Send + Sync,
1449 SC: SelectChain<Block> + 'static,
1450{
1451 type Error = ConsensusError;
1452
1453 async fn import_block(
1454 &self,
1455 mut block: BlockImportParams<Block>,
1456 ) -> Result<ImportResult, Self::Error> {
1457 let hash = block.post_hash();
1458 let parent_hash = *block.header.parent_hash();
1459 let number = *block.header.number();
1460 let info = self.client.info();
1461
1462 self.check_inherents_and_equivocations(&mut block).await?;
1463
1464 let block_status = self
1465 .client
1466 .status(hash)
1467 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1468
1469 if info.block_gap.map_or(false, |gap| gap.start <= number && number <= gap.end)
1473 || block_status == BlockStatus::InChain
1474 {
1475 let _ = block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY);
1478 block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1479 return self.inner.import_block(block).await.map_err(Into::into);
1480 }
1481
1482 if block.with_state() {
1483 return self.import_state(block).await;
1484 }
1485
1486 let pre_digest = find_pre_digest::<Block>(&block.header).expect(
1487 "valid babe headers must contain a predigest; header has been already verified; qed",
1488 );
1489 let slot = pre_digest.slot();
1490
1491 let mut old_epoch_changes = None;
1494
1495 let epoch_changes = if block.origin != BlockOrigin::WarpSync {
1497 let parent_header = self
1498 .client
1499 .header(parent_hash)
1500 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1501 .ok_or_else(|| {
1502 ConsensusError::ChainLookup(
1503 babe_err(Error::<Block>::ParentUnavailable(parent_hash, hash)).into(),
1504 )
1505 })?;
1506
1507 let parent_slot = find_pre_digest::<Block>(&parent_header).map(|d| d.slot()).expect(
1508 "parent is non-genesis; valid BABE headers contain a pre-digest; header has already \
1509 been verified; qed",
1510 );
1511
1512 if slot <= parent_slot {
1514 return Err(ConsensusError::ClientImport(
1515 babe_err(Error::<Block>::SlotMustIncrease(parent_slot, slot)).into(),
1516 ));
1517 }
1518
1519 let mut epoch_changes = self.epoch_changes.shared_data_locked();
1520
1521 let (epoch_descriptor, first_in_epoch, parent_weight) = {
1527 let parent_weight = if *parent_header.number() == Zero::zero() {
1528 0
1529 } else {
1530 aux_schema::load_block_weight(&*self.client, parent_hash)
1531 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1532 .ok_or_else(|| {
1533 ConsensusError::ClientImport(
1534 babe_err(Error::<Block>::ParentBlockNoAssociatedWeight(hash))
1535 .into(),
1536 )
1537 })?
1538 };
1539
1540 let intermediate =
1541 block.remove_intermediate::<BabeIntermediate<Block>>(INTERMEDIATE_KEY)?;
1542
1543 let epoch_descriptor = intermediate.epoch_descriptor;
1544 let first_in_epoch = parent_slot < epoch_descriptor.start_slot();
1545 (epoch_descriptor, first_in_epoch, parent_weight)
1546 };
1547
1548 let total_weight = parent_weight + pre_digest.added_weight();
1549
1550 let next_epoch_digest = find_next_epoch_digest::<Block>(&block.header)
1552 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1553 let next_config_digest = find_next_config_digest::<Block>(&block.header)
1554 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1555
1556 match (first_in_epoch, next_epoch_digest.is_some(), next_config_digest.is_some()) {
1557 (true, true, _) => {},
1558 (false, false, false) => {},
1559 (false, false, true) => {
1560 return Err(ConsensusError::ClientImport(
1561 babe_err(Error::<Block>::UnexpectedConfigChange).into(),
1562 ))
1563 },
1564 (true, false, _) => {
1565 return Err(ConsensusError::ClientImport(
1566 babe_err(Error::<Block>::ExpectedEpochChange(hash, slot)).into(),
1567 ))
1568 },
1569 (false, true, _) => {
1570 return Err(ConsensusError::ClientImport(
1571 babe_err(Error::<Block>::UnexpectedEpochChange).into(),
1572 ))
1573 },
1574 }
1575
1576 if let Some(next_epoch_descriptor) = next_epoch_digest {
1577 old_epoch_changes = Some((*epoch_changes).clone());
1578
1579 let mut viable_epoch = epoch_changes
1580 .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&self.config, slot))
1581 .ok_or_else(|| {
1582 ConsensusError::ClientImport(Error::<Block>::FetchEpoch(parent_hash).into())
1583 })?
1584 .into_cloned();
1585
1586 let epoch_config = next_config_digest
1587 .map(Into::into)
1588 .unwrap_or_else(|| viable_epoch.as_ref().config.clone());
1589
1590 let log_level = if block.origin == BlockOrigin::NetworkInitialSync {
1592 log::Level::Debug
1593 } else {
1594 log::Level::Info
1595 };
1596
1597 if viable_epoch.as_ref().end_slot() <= slot {
1598 let epoch = viable_epoch.as_mut();
1613 let prev_index = epoch.epoch_index;
1614 *epoch = epoch.clone_for_slot(slot);
1615
1616 warn!(
1617 target: LOG_TARGET,
1618 "👶 Epoch(s) skipped: from {} to {}", prev_index, epoch.epoch_index,
1619 );
1620 }
1621
1622 log!(
1623 target: LOG_TARGET,
1624 log_level,
1625 "👶 New epoch {} launching at block {} (block slot {} >= start slot {}).",
1626 viable_epoch.as_ref().epoch_index,
1627 hash,
1628 slot,
1629 viable_epoch.as_ref().start_slot,
1630 );
1631
1632 let next_epoch = viable_epoch.increment((next_epoch_descriptor, epoch_config));
1633
1634 log!(
1635 target: LOG_TARGET,
1636 log_level,
1637 "👶 Next epoch starts at slot {}",
1638 next_epoch.as_ref().start_slot,
1639 );
1640
1641 let prune_and_import = || {
1649 prune_finalized(self.client.clone(), &mut epoch_changes)?;
1650
1651 epoch_changes
1652 .import(
1653 descendent_query(&*self.client),
1654 hash,
1655 number,
1656 *block.header.parent_hash(),
1657 next_epoch,
1658 )
1659 .map_err(|e| {
1660 ConsensusError::ClientImport(format!(
1661 "Error importing epoch changes: {}",
1662 e
1663 ))
1664 })?;
1665 Ok(())
1666 };
1667
1668 if let Err(e) = prune_and_import() {
1669 debug!(target: LOG_TARGET, "Failed to launch next epoch: {}", e);
1670 *epoch_changes =
1671 old_epoch_changes.expect("set `Some` above and not taken; qed");
1672 return Err(e);
1673 }
1674
1675 crate::aux_schema::write_epoch_changes::<Block, _, _>(&*epoch_changes, |insert| {
1676 block
1677 .auxiliary
1678 .extend(insert.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1679 });
1680 }
1681
1682 aux_schema::write_block_weight(hash, total_weight, |values| {
1683 block
1684 .auxiliary
1685 .extend(values.iter().map(|(k, v)| (k.to_vec(), Some(v.to_vec()))))
1686 });
1687
1688 block.fork_choice = {
1692 let (last_best, last_best_number) = (info.best_hash, info.best_number);
1693
1694 let last_best_weight = if &last_best == block.header.parent_hash() {
1695 parent_weight
1698 } else {
1699 aux_schema::load_block_weight(&*self.client, last_best)
1700 .map_err(|e| ConsensusError::ChainLookup(e.to_string()))?
1701 .ok_or_else(|| {
1702 ConsensusError::ChainLookup(
1703 "No block weight for parent header.".to_string(),
1704 )
1705 })?
1706 };
1707
1708 Some(ForkChoiceStrategy::Custom(if total_weight > last_best_weight {
1709 true
1710 } else if total_weight == last_best_weight {
1711 number > last_best_number
1712 } else {
1713 false
1714 }))
1715 };
1716
1717 Some(epoch_changes.release_mutex())
1719 } else {
1720 block.fork_choice = Some(ForkChoiceStrategy::Custom(false));
1721 None
1722 };
1723
1724 let import_result = self.inner.import_block(block).await;
1725
1726 if import_result.is_err() {
1729 if let (Some(mut epoch_changes), Some(old_epoch_changes)) =
1730 (epoch_changes, old_epoch_changes)
1731 {
1732 *epoch_changes.upgrade() = old_epoch_changes;
1733 }
1734 }
1735
1736 import_result.map_err(Into::into)
1737 }
1738
1739 async fn check_block(
1740 &self,
1741 block: BlockCheckParams<Block>,
1742 ) -> Result<ImportResult, Self::Error> {
1743 self.inner.check_block(block).await.map_err(Into::into)
1744 }
1745}
1746
1747fn prune_finalized<Block, Client>(
1749 client: Arc<Client>,
1750 epoch_changes: &mut EpochChangesFor<Block, Epoch>,
1751) -> Result<(), ConsensusError>
1752where
1753 Block: BlockT,
1754 Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = soil_client::blockchain::Error>,
1755{
1756 let info = client.info();
1757
1758 let finalized_slot = {
1759 let finalized_header = client
1760 .header(info.finalized_hash)
1761 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?
1762 .expect(
1763 "best finalized hash was given by client; finalized headers must exist in db; qed",
1764 );
1765
1766 find_pre_digest::<Block>(&finalized_header)
1767 .expect("finalized header must be valid; valid blocks have a pre-digest; qed")
1768 .slot()
1769 };
1770
1771 epoch_changes
1772 .prune_finalized(
1773 descendent_query(&*client),
1774 &info.finalized_hash,
1775 info.finalized_number,
1776 finalized_slot,
1777 )
1778 .map_err(|e| ConsensusError::ClientImport(e.to_string()))?;
1779
1780 Ok(())
1781}
1782
1783pub fn block_import<Client, Block: BlockT, I, CIDP, SC>(
1789 config: BabeConfiguration,
1790 wrapped_block_import: I,
1791 client: Arc<Client>,
1792 create_inherent_data_providers: CIDP,
1793 select_chain: SC,
1794 offchain_tx_pool_factory: OffchainTransactionPoolFactory<Block>,
1795) -> ClientResult<(BabeBlockImport<Block, Client, I, CIDP, SC>, BabeLink<Block>)>
1796where
1797 Client: AuxStore
1798 + HeaderBackend<Block>
1799 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1800 + PreCommitActions<Block>
1801 + 'static,
1802{
1803 let epoch_changes = aux_schema::load_epoch_changes::<Block, _>(&*client, &config)?;
1804 let link = BabeLink { epoch_changes: epoch_changes.clone(), config: config.clone() };
1805
1806 prune_finalized(client.clone(), &mut epoch_changes.shared_data())?;
1810
1811 let client_weak = Arc::downgrade(&client);
1812 let on_finality = move |summary: &FinalityNotification<Block>| {
1813 if let Some(client) = client_weak.upgrade() {
1814 aux_storage_cleanup(client.as_ref(), summary)
1815 } else {
1816 Default::default()
1817 }
1818 };
1819 client.register_finality_action(Box::new(on_finality));
1820
1821 let import = BabeBlockImport::new(
1822 client,
1823 epoch_changes,
1824 wrapped_block_import,
1825 config,
1826 create_inherent_data_providers,
1827 select_chain,
1828 offchain_tx_pool_factory,
1829 );
1830
1831 Ok((import, link))
1832}
1833
1834pub struct ImportQueueParams<'a, Block: BlockT, BI, Client, Spawn> {
1836 pub link: BabeLink<Block>,
1838 pub block_import: BI,
1840 pub justification_import: Option<BoxJustificationImport<Block>>,
1842 pub client: Arc<Client>,
1844 pub slot_duration: SlotDuration,
1846 pub spawner: &'a Spawn,
1848 pub registry: Option<&'a Registry>,
1850 pub telemetry: Option<TelemetryHandle>,
1852}
1853
1854pub fn import_queue<Block: BlockT, Client, BI, Spawn>(
1864 ImportQueueParams {
1865 link: babe_link,
1866 block_import,
1867 justification_import,
1868 client,
1869 slot_duration,
1870 spawner,
1871 registry,
1872 telemetry,
1873 }: ImportQueueParams<'_, Block, BI, Client, Spawn>,
1874) -> ClientResult<(DefaultImportQueue<Block>, BabeWorkerHandle<Block>)>
1875where
1876 BI: BlockImport<Block, Error = ConsensusError> + Send + Sync + 'static,
1877 Client: ProvideRuntimeApi<Block>
1878 + HeaderBackend<Block>
1879 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1880 + AuxStore
1881 + Send
1882 + Sync
1883 + 'static,
1884 Client::Api: BlockBuilderApi<Block> + BabeApi<Block> + ApiExt<Block>,
1885 Spawn: SpawnEssentialNamed,
1886{
1887 const HANDLE_BUFFER_SIZE: usize = 1024;
1888
1889 let verifier = BabeVerifier {
1890 slot_duration,
1891 config: babe_link.config.clone(),
1892 epoch_changes: babe_link.epoch_changes.clone(),
1893 telemetry,
1894 client: client.clone(),
1895 };
1896
1897 let (worker_tx, worker_rx) = channel(HANDLE_BUFFER_SIZE);
1898
1899 let answer_requests =
1900 answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes);
1901
1902 spawner.spawn_essential("babe-worker", Some("babe"), answer_requests.boxed());
1903
1904 Ok((
1905 BasicQueue::new(verifier, Box::new(block_import), justification_import, spawner, registry),
1906 BabeWorkerHandle(worker_tx),
1907 ))
1908}
1909
1910pub fn revert<Block, Client, Backend>(
1914 client: Arc<Client>,
1915 backend: Arc<Backend>,
1916 blocks: NumberFor<Block>,
1917) -> ClientResult<()>
1918where
1919 Block: BlockT,
1920 Client: AuxStore
1921 + HeaderMetadata<Block, Error = soil_client::blockchain::Error>
1922 + HeaderBackend<Block>
1923 + ProvideRuntimeApi<Block>
1924 + UsageProvider<Block>,
1925 Client::Api: BabeApi<Block>,
1926 Backend: BackendT<Block>,
1927{
1928 let best_number = client.info().best_number;
1929 let finalized = client.info().finalized_number;
1930
1931 let revertible = blocks.min(best_number - finalized);
1932 if revertible == Zero::zero() {
1933 return Ok(());
1934 }
1935
1936 let revert_up_to_number = best_number - revertible;
1937 let revert_up_to_hash = client.hash(revert_up_to_number)?.ok_or(ClientError::Backend(
1938 format!("Unexpected hash lookup failure for block number: {}", revert_up_to_number),
1939 ))?;
1940
1941 let config = configuration(&*client)?;
1945 let epoch_changes = aux_schema::load_epoch_changes::<Block, Client>(&*client, &config)?;
1946 let mut epoch_changes = epoch_changes.shared_data();
1947
1948 if revert_up_to_number == Zero::zero() {
1949 *epoch_changes = EpochChangesFor::<Block, Epoch>::default();
1951 } else {
1952 epoch_changes.revert(descendent_query(&*client), revert_up_to_hash, revert_up_to_number);
1953 }
1954
1955 let mut weight_keys = HashSet::with_capacity(revertible.saturated_into());
1958
1959 let leaves = backend.blockchain().leaves()?.into_iter().filter(|&leaf| {
1960 soil_client::blockchain::tree_route(&*client, revert_up_to_hash, leaf)
1961 .map(|route| route.retracted().is_empty())
1962 .unwrap_or_default()
1963 });
1964
1965 for leaf in leaves {
1966 let mut hash = leaf;
1967 loop {
1968 let meta = client.header_metadata(hash)?;
1969 if meta.number <= revert_up_to_number
1970 || !weight_keys.insert(aux_schema::block_weight_key(hash))
1971 {
1972 break;
1974 }
1975 hash = meta.parent;
1976 }
1977 }
1978
1979 let weight_keys: Vec<_> = weight_keys.iter().map(|val| val.as_slice()).collect();
1980
1981 aux_schema::write_epoch_changes::<Block, _, _>(&epoch_changes, |values| {
1983 client.insert_aux(values, weight_keys.iter())
1984 })
1985}
1986
1987fn query_epoch_changes<Block, Client>(
1988 epoch_changes: &SharedEpochChanges<Block, Epoch>,
1989 client: &Client,
1990 config: &BabeConfiguration,
1991 block_number: NumberFor<Block>,
1992 slot: Slot,
1993 parent_hash: Block::Hash,
1994) -> Result<
1995 (ViableEpochDescriptor<Block::Hash, NumberFor<Block>, Epoch>, ViableEpoch<Epoch>),
1996 Error<Block>,
1997>
1998where
1999 Block: BlockT,
2000 Client: HeaderBackend<Block> + HeaderMetadata<Block, Error = soil_client::blockchain::Error>,
2001{
2002 let epoch_changes = epoch_changes.shared_data();
2003 let epoch_descriptor = epoch_changes
2004 .epoch_descriptor_for_child_of(
2005 descendent_query(client),
2006 &parent_hash,
2007 block_number - 1u32.into(),
2008 slot,
2009 )
2010 .map_err(|e| Error::<Block>::ForkTree(Box::new(e)))?
2011 .ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2012 let viable_epoch = epoch_changes
2013 .viable_epoch(&epoch_descriptor, |slot| Epoch::genesis(&config, slot))
2014 .ok_or(Error::<Block>::FetchEpoch(parent_hash))?;
2015 Ok((epoch_descriptor, viable_epoch.into_cloned()))
2016}