use codec::Codec;
use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
use cumulus_client_consensus_common::{
self as consensus_common, ParachainBlockImportMarker, ParachainCandidate,
};
use cumulus_client_parachain_inherent::{ParachainInherentData, ParachainInherentDataProvider};
use cumulus_primitives_core::{
relay_chain::Hash as PHash, DigestItem, ParachainBlockData, PersistedValidationData,
RelayProofRequest,
};
use cumulus_relay_chain_interface::RelayChainInterface;
use sc_client_api::BackendTransaction;
use sp_consensus::{Environment, ProposeArgs, Proposer};
use polkadot_node_primitives::{Collation, MaybeCompressedPoV};
use polkadot_primitives::{Header as PHeader, Id as ParaId};
use sp_externalities::Extensions;
use sp_trie::proof_size_extension::ProofSizeExt;
use crate::collators::RelayParentData;
use futures::prelude::*;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction};
use sc_consensus_aura::standalone as aura_internal;
use sc_network_types::PeerId;
use sp_api::{ApiExt, ProofRecorder, ProvideRuntimeApi, StorageProof};
use sp_application_crypto::AppPublic;
use sp_consensus::BlockOrigin;
use sp_consensus_aura::{AuraApi, Slot, SlotDuration};
use sp_core::crypto::Pair;
use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider};
use sp_keystore::KeystorePtr;
use sp_runtime::{
generic::Digest,
traits::{Block as BlockT, HashingFor, Header as HeaderT, Member},
};
use sp_state_machine::StorageChanges;
use sp_timestamp::Timestamp;
use std::{error::Error, time::Duration};
pub struct Params<BI, CIDP, RClient, PF, CS> {
pub create_inherent_data_providers: CIDP,
pub block_import: BI,
pub relay_client: RClient,
pub keystore: KeystorePtr,
pub collator_peer_id: PeerId,
pub para_id: ParaId,
pub proposer: PF,
pub collator_service: CS,
}
pub struct BuildBlockAndImportParams<'a, Block: BlockT, P: Pair> {
pub parent_header: &'a Block::Header,
pub slot_claim: &'a SlotClaim<P::Public>,
pub additional_pre_digest: Vec<DigestItem>,
pub parachain_inherent_data: ParachainInherentData,
pub extra_inherent_data: InherentData,
pub proposal_duration: Duration,
pub max_pov_size: usize,
pub storage_proof_recorder: Option<ProofRecorder<Block>>,
pub extra_extensions: Extensions,
}
pub struct BuiltBlock<Block: BlockT> {
pub block: Block,
pub proof: StorageProof,
pub backend_transaction: BackendTransaction<HashingFor<Block>>,
}
impl<Block: BlockT> From<BuiltBlock<Block>> for ParachainCandidate<Block> {
fn from(built: BuiltBlock<Block>) -> Self {
Self { block: built.block, proof: built.proof }
}
}
pub struct Collator<Block, P, BI, CIDP, RClient, PF, CS> {
create_inherent_data_providers: CIDP,
block_import: BI,
relay_client: RClient,
keystore: KeystorePtr,
para_id: ParaId,
proposer: PF,
collator_service: CS,
_marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
}
impl<Block, P, BI, CIDP, RClient, PF, CS> Collator<Block, P, BI, CIDP, RClient, PF, CS>
where
Block: BlockT,
RClient: RelayChainInterface,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
PF: Environment<Block>,
CS: CollatorServiceInterface<Block>,
P: Pair,
P::Public: AppPublic + Member,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
pub fn new(params: Params<BI, CIDP, RClient, PF, CS>) -> Self {
Collator {
create_inherent_data_providers: params.create_inherent_data_providers,
block_import: params.block_import,
relay_client: params.relay_client,
keystore: params.keystore,
para_id: params.para_id,
proposer: params.proposer,
collator_service: params.collator_service,
_marker: std::marker::PhantomData,
}
}
pub async fn create_inherent_data_with_rp_offset(
&self,
relay_parent: PHash,
validation_data: &PersistedValidationData,
parent_hash: Block::Hash,
timestamp: impl Into<Option<Timestamp>>,
relay_parent_descendants: Option<RelayParentData>,
relay_proof_request: RelayProofRequest,
collator_peer_id: PeerId,
) -> Result<(ParachainInherentData, InherentData), Box<dyn Error + Send + Sync + 'static>> {
let paras_inherent_data = ParachainInherentDataProvider::create_at(
relay_parent,
&self.relay_client,
validation_data,
self.para_id,
relay_parent_descendants
.map(RelayParentData::into_inherent_descendant_list)
.unwrap_or_default(),
relay_proof_request,
collator_peer_id,
)
.await;
let paras_inherent_data = match paras_inherent_data {
Some(p) => p,
None => {
return Err(
format!("Could not create paras inherent data at {:?}", relay_parent).into()
)
},
};
let mut other_inherent_data = self
.create_inherent_data_providers
.create_inherent_data_providers(parent_hash, ())
.map_err(|e| e as Box<dyn Error + Send + Sync + 'static>)
.await?
.create_inherent_data()
.await
.map_err(Box::new)?;
if let Some(timestamp) = timestamp.into() {
other_inherent_data.replace_data(sp_timestamp::INHERENT_IDENTIFIER, ×tamp);
}
Ok((paras_inherent_data, other_inherent_data))
}
pub async fn create_inherent_data(
&self,
relay_parent: PHash,
validation_data: &PersistedValidationData,
parent_hash: Block::Hash,
timestamp: impl Into<Option<Timestamp>>,
relay_proof_request: RelayProofRequest,
collator_peer_id: PeerId,
) -> Result<(ParachainInherentData, InherentData), Box<dyn Error + Send + Sync + 'static>> {
self.create_inherent_data_with_rp_offset(
relay_parent,
validation_data,
parent_hash,
timestamp,
None,
relay_proof_request,
collator_peer_id,
)
.await
}
pub async fn build_block_and_import(
&mut self,
mut params: BuildBlockAndImportParams<'_, Block, P>,
) -> Result<Option<BuiltBlock<Block>>, Box<dyn Error + Send + 'static>> {
let mut digest = params.additional_pre_digest;
digest.push(params.slot_claim.pre_digest.clone());
let proposer = self
.proposer
.init(¶ms.parent_header)
.await
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;
let mut inherent_data_combined = params.extra_inherent_data;
params
.parachain_inherent_data
.provide_inherent_data(&mut inherent_data_combined)
.await
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;
let recorder_passed = params.storage_proof_recorder.is_some();
let storage_proof_recorder = params.storage_proof_recorder.unwrap_or_default();
let proof_size_ext_registered =
params.extra_extensions.is_registered(ProofSizeExt::type_id());
if !proof_size_ext_registered {
params
.extra_extensions
.register(ProofSizeExt::new(storage_proof_recorder.clone()));
} else if proof_size_ext_registered && !recorder_passed {
return Err(
Box::from("`ProofSizeExt` registered, but no `storage_proof_recorder` provided. This is a bug.")
as Box<dyn Error + Send + Sync>
);
}
let propose_args = ProposeArgs {
inherent_data: inherent_data_combined,
inherent_digests: Digest { logs: digest },
max_duration: params.proposal_duration,
block_size_limit: Some(params.max_pov_size),
extra_extensions: params.extra_extensions,
storage_proof_recorder: Some(storage_proof_recorder.clone()),
};
let proposal = proposer
.propose(propose_args)
.await
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;
let backend_transaction = proposal.storage_changes.transaction.clone();
let sealed_importable = seal::<_, P>(
proposal.block,
proposal.storage_changes,
¶ms.slot_claim.author_pub,
&self.keystore,
)
.map_err(|e| e as Box<dyn Error + Send>)?;
let block = Block::new(
sealed_importable.post_header(),
sealed_importable
.body
.as_ref()
.expect("body always created with this `propose` fn; qed")
.clone(),
);
self.block_import
.import_block(sealed_importable)
.map_err(|e| Box::new(e) as Box<dyn Error + Send>)
.await?;
let proof = storage_proof_recorder.drain_storage_proof();
Ok(Some(BuiltBlock { block, proof, backend_transaction }))
}
pub async fn collate(
&mut self,
parent_header: &Block::Header,
slot_claim: &SlotClaim<P::Public>,
additional_pre_digest: impl Into<Option<Vec<DigestItem>>>,
inherent_data: (ParachainInherentData, InherentData),
proposal_duration: Duration,
max_pov_size: usize,
) -> Result<Option<(Collation, ParachainBlockData<Block>)>, Box<dyn Error + Send + 'static>> {
let maybe_candidate = self
.build_block_and_import(BuildBlockAndImportParams {
parent_header,
slot_claim,
additional_pre_digest: additional_pre_digest.into().unwrap_or_default(),
parachain_inherent_data: inherent_data.0,
extra_inherent_data: inherent_data.1,
proposal_duration,
max_pov_size,
storage_proof_recorder: None,
extra_extensions: Default::default(),
})
.await?;
let Some(candidate) = maybe_candidate else { return Ok(None) };
let hash = candidate.block.header().hash();
if let Some((collation, block_data)) =
self.collator_service.build_collation(parent_header, hash, candidate.into())
{
block_data.log_size_info();
if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
tracing::info!(
target: crate::LOG_TARGET,
"Compressed PoV size: {}kb",
pov.block_data.0.len() as f64 / 1024f64,
);
}
Ok(Some((collation, block_data)))
} else {
Err(Box::<dyn Error + Send + Sync>::from("Unable to produce collation"))
}
}
pub fn collator_service(&self) -> &CS {
&self.collator_service
}
}
pub struct SlotClaim<Pub> {
author_pub: Pub,
pre_digest: DigestItem,
slot: Slot,
timestamp: Timestamp,
}
impl<Pub> SlotClaim<Pub> {
pub fn unchecked<P>(author_pub: Pub, slot: Slot, timestamp: Timestamp) -> Self
where
P: Pair<Public = Pub>,
P::Public: Codec,
P::Signature: Codec,
{
SlotClaim { author_pub, timestamp, pre_digest: aura_internal::pre_digest::<P>(slot), slot }
}
pub fn author_pub(&self) -> &Pub {
&self.author_pub
}
pub fn pre_digest(&self) -> &DigestItem {
&self.pre_digest
}
pub fn slot(&self) -> Slot {
self.slot
}
pub fn timestamp(&self) -> Timestamp {
self.timestamp
}
}
pub async fn claim_slot<B, C, P>(
client: &C,
parent_hash: B::Hash,
relay_parent_header: &PHeader,
slot_duration: SlotDuration,
relay_chain_slot_duration: Duration,
keystore: &KeystorePtr,
) -> Result<Option<SlotClaim<P::Public>>, Box<dyn Error>>
where
B: BlockT,
C: ProvideRuntimeApi<B> + Send + Sync + 'static,
C::Api: AuraApi<B, P::Public>,
P: Pair,
P::Public: Codec,
P::Signature: Codec,
{
let mut runtime_api = client.runtime_api();
runtime_api.set_call_context(sp_core::traits::CallContext::Onchain);
let authorities = runtime_api.authorities(parent_hash).map_err(Box::new)?;
let (slot_now, timestamp) = match consensus_common::relay_slot_and_timestamp(
relay_parent_header,
relay_chain_slot_duration,
) {
Some((r_s, t)) => {
let our_slot = Slot::from_timestamp(t, slot_duration);
tracing::debug!(
target: crate::LOG_TARGET,
relay_slot = ?r_s,
para_slot = ?our_slot,
timestamp = ?t,
?slot_duration,
?relay_chain_slot_duration,
"Adjusted relay-chain slot to parachain slot"
);
(our_slot, t)
},
None => return Ok(None),
};
let author_pub = {
let res = aura_internal::claim_slot::<P>(slot_now, &authorities, keystore).await;
match res {
Some(p) => p,
None => return Ok(None),
}
};
Ok(Some(SlotClaim::unchecked::<P>(author_pub, slot_now, timestamp)))
}
pub fn seal<B: BlockT, P>(
pre_sealed: B,
storage_changes: StorageChanges<HashingFor<B>>,
author_pub: &P::Public,
keystore: &KeystorePtr,
) -> Result<BlockImportParams<B>, Box<dyn Error + Send + Sync + 'static>>
where
P: Pair,
P::Signature: Codec + TryFrom<Vec<u8>>,
P::Public: AppPublic,
{
let (pre_header, body) = pre_sealed.deconstruct();
let pre_hash = pre_header.hash();
let block_number = *pre_header.number();
let block_import_params = {
let seal_digest =
aura_internal::seal::<_, P>(&pre_hash, &author_pub, keystore).map_err(Box::new)?;
let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, pre_header);
block_import_params.post_digests.push(seal_digest);
block_import_params.body = Some(body);
block_import_params.state_action =
StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
block_import_params
};
let post_hash = block_import_params.post_hash();
tracing::info!(
target: crate::LOG_TARGET,
"🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
block_number,
post_hash,
pre_hash,
);
Ok(block_import_params)
}