use codec::{Codec, Encode};
use cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface;
use cumulus_client_consensus_common::{
self as consensus_common, ParachainBlockImportMarker, ParentSearchParams,
};
use cumulus_client_consensus_proposer::ProposerInterface;
use cumulus_primitives_aura::AuraUnincludedSegmentApi;
use cumulus_primitives_core::{
relay_chain::Hash as PHash, CollectCollationInfo, PersistedValidationData,
};
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_node_primitives::SubmitCollationParams;
use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption};
use futures::prelude::*;
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
use sc_consensus::BlockImport;
use sc_consensus_aura::standalone as aura_internal;
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus::SyncOracle;
use sp_consensus_aura::{AuraApi, Slot, SlotDuration};
use sp_core::crypto::Pair;
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
use sp_timestamp::Timestamp;
use std::{convert::TryFrom, sync::Arc, time::Duration};
use crate::collator::{self as collator_util, SlotClaim};
pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, SO, Proposer, CS> {
pub create_inherent_data_providers: CIDP,
pub block_import: BI,
pub para_client: Arc<Client>,
pub para_backend: Arc<Backend>,
pub relay_client: RClient,
pub code_hash_provider: CHP,
pub sync_oracle: SO,
pub keystore: KeystorePtr,
pub collator_key: CollatorPair,
pub para_id: ParaId,
pub overseer_handle: OverseerHandle,
pub slot_duration: SlotDuration,
pub relay_chain_slot_duration: Duration,
pub proposer: Proposer,
pub collator_service: CS,
pub authoring_duration: Duration,
}
pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, SO, Proposer, CS>(
mut params: Params<BI, CIDP, Client, Backend, RClient, CHP, SO, Proposer, CS>,
) -> impl Future<Output = ()> + Send + 'static
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>
+ BlockOf
+ AuxStore
+ HeaderBackend<Block>
+ BlockBackend<Block>
+ Send
+ Sync
+ 'static,
Client::Api:
AuraApi<Block, P::Public> + CollectCollationInfo<Block> + AuraUnincludedSegmentApi<Block>,
Backend: sc_client_api::Backend<Block> + 'static,
RClient: RelayChainInterface + Clone + 'static,
CIDP: CreateInherentDataProviders<Block, ()> + 'static,
CIDP::InherentDataProviders: Send,
BI: BlockImport<Block> + ParachainBlockImportMarker + Send + Sync + 'static,
SO: SyncOracle + Send + Sync + Clone + 'static,
Proposer: ProposerInterface<Block> + Send + Sync + 'static,
CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
P: Pair,
P::Public: AppPublic + Member + Codec,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
const PARENT_SEARCH_DEPTH: usize = 10;
async move {
cumulus_client_collator::initialize_collator_subsystems(
&mut params.overseer_handle,
params.collator_key,
params.para_id,
)
.await;
let mut import_notifications = match params.relay_client.import_notification_stream().await
{
Ok(s) => s,
Err(err) => {
tracing::error!(
target: crate::LOG_TARGET,
?err,
"Failed to initialize consensus: no relay chain import notification stream"
);
return
},
};
let mut collator = {
let params = collator_util::Params {
create_inherent_data_providers: params.create_inherent_data_providers,
block_import: params.block_import,
relay_client: params.relay_client.clone(),
keystore: params.keystore.clone(),
para_id: params.para_id,
proposer: params.proposer,
collator_service: params.collator_service,
};
collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
};
while let Some(relay_parent_header) = import_notifications.next().await {
let relay_parent = relay_parent_header.hash();
let max_pov_size = match params
.relay_client
.persisted_validation_data(
relay_parent,
params.para_id,
OccupiedCoreAssumption::Included,
)
.await
{
Ok(None) => continue,
Ok(Some(pvd)) => pvd.max_pov_size,
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
continue
},
};
let (slot_now, timestamp) = match consensus_common::relay_slot_and_timestamp(
&relay_parent_header,
params.relay_chain_slot_duration,
) {
None => continue,
Some((r_s, t)) => {
let our_slot = Slot::from_timestamp(t, params.slot_duration);
tracing::debug!(
target: crate::LOG_TARGET,
relay_slot = ?r_s,
para_slot = ?our_slot,
timestamp = ?t,
slot_duration = ?params.slot_duration,
relay_chain_slot_duration = ?params.relay_chain_slot_duration,
"Adjusted relay-chain slot to parachain slot"
);
(our_slot, t)
},
};
let parent_search_params = ParentSearchParams {
relay_parent,
para_id: params.para_id,
ancestry_lookback: max_ancestry_lookback(relay_parent, ¶ms.relay_client).await,
max_depth: PARENT_SEARCH_DEPTH,
ignore_alternative_branches: true,
};
let potential_parents =
cumulus_client_consensus_common::find_potential_parents::<Block>(
parent_search_params,
&*params.para_backend,
¶ms.relay_client,
)
.await;
let mut potential_parents = match potential_parents {
Err(e) => {
tracing::error!(
target: crate::LOG_TARGET,
?relay_parent,
err = ?e,
"Could not fetch potential parents to build upon"
);
continue
},
Ok(x) => x,
};
let included_block = match potential_parents.iter().find(|x| x.depth == 0) {
None => continue, Some(b) => b.hash,
};
let para_client = &*params.para_client;
let keystore = ¶ms.keystore;
let can_build_upon = |block_hash| {
can_build_upon::<_, _, P>(
slot_now,
timestamp,
block_hash,
included_block,
para_client,
&keystore,
)
};
potential_parents.sort_by_key(|a| a.depth);
let initial_parent = match potential_parents.pop() {
None => continue,
Some(p) => p,
};
let mut parent_hash = initial_parent.hash;
let mut parent_header = initial_parent.header;
let overseer_handle = &mut params.overseer_handle;
for n_built in 0..2 {
let slot_claim = match can_build_upon(parent_hash).await {
None => break,
Some(c) => c,
};
tracing::debug!(
target: crate::LOG_TARGET,
?relay_parent,
unincluded_segment_len = initial_parent.depth + n_built,
"Slot claimed. Building"
);
let validation_data = PersistedValidationData {
parent_head: parent_header.encode().into(),
relay_parent_number: *relay_parent_header.number(),
relay_parent_storage_root: *relay_parent_header.state_root(),
max_pov_size,
};
let (parachain_inherent_data, other_inherent_data) = match collator
.create_inherent_data(
relay_parent,
&validation_data,
parent_hash,
slot_claim.timestamp(),
)
.await
{
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err);
break
},
Ok(x) => x,
};
let validation_code_hash = match params.code_hash_provider.code_hash_at(parent_hash)
{
None => {
tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
break
},
Some(v) => v,
};
match collator
.collate(
&parent_header,
&slot_claim,
None,
(parachain_inherent_data, other_inherent_data),
params.authoring_duration,
(validation_data.max_pov_size / 2) as usize,
)
.await
{
Ok((collation, block_data, new_block_hash)) => {
collator.collator_service().announce_block(new_block_hash, None);
overseer_handle
.send_msg(
CollationGenerationMessage::SubmitCollation(
SubmitCollationParams {
relay_parent,
collation,
parent_head: parent_header.encode().into(),
validation_code_hash,
result_sender: None,
},
),
"SubmitCollation",
)
.await;
parent_hash = new_block_hash;
parent_header = block_data.into_header();
},
Err(err) => {
tracing::error!(target: crate::LOG_TARGET, ?err);
break
},
}
}
}
}
}
async fn can_build_upon<Block: BlockT, Client, P>(
slot: Slot,
timestamp: Timestamp,
parent_hash: Block::Hash,
included_block: Block::Hash,
client: &Client,
keystore: &KeystorePtr,
) -> Option<SlotClaim<P::Public>>
where
Client: ProvideRuntimeApi<Block>,
Client::Api: AuraApi<Block, P::Public> + AuraUnincludedSegmentApi<Block>,
P: Pair,
P::Public: Codec,
P::Signature: Codec,
{
let runtime_api = client.runtime_api();
let authorities = runtime_api.authorities(parent_hash).ok()?;
let author_pub = aura_internal::claim_slot::<P>(slot, &authorities, keystore).await?;
if parent_hash != included_block {
if !runtime_api.can_build_upon(parent_hash, included_block, slot).ok()? {
return None
}
}
Some(SlotClaim::unchecked::<P>(author_pub, slot, timestamp))
}
async fn max_ancestry_lookback(
_relay_parent: PHash,
_relay_client: &impl RelayChainInterface,
) -> usize {
2
}