use crate::collator::SlotClaim;
use codec::Codec;
use cumulus_client_consensus_common::{self as consensus_common, ParentSearchParams};
use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot};
use cumulus_primitives_core::{
relay_chain::Header as RelayHeader, BlockT, KeyToIncludeInRelayProof, RelayProofRequest,
};
use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface};
use polkadot_node_subsystem::messages::{CollatorProtocolMessage, RuntimeApiRequest};
use polkadot_node_subsystem_util::runtime::ClaimQueueSnapshot;
use polkadot_primitives::{
Hash as RelayHash, Id as ParaId, OccupiedCoreAssumption, ValidationCodeHash,
DEFAULT_SCHEDULING_LOOKAHEAD,
};
use sc_consensus_aura::{standalone as aura_internal, AuraApi};
use sp_api::{ApiExt, ProvideRuntimeApi, RuntimeApiInfo};
use sp_core::Pair;
use sp_keystore::KeystorePtr;
use sp_timestamp::Timestamp;
pub mod basic;
pub mod lookahead;
pub mod slot_based;
struct BackingGroupConnectionHelper {
keystore: sp_keystore::KeystorePtr,
overseer_handle: OverseerHandle,
our_slot: Option<Slot>,
}
impl BackingGroupConnectionHelper {
pub fn new(keystore: sp_keystore::KeystorePtr, overseer_handle: OverseerHandle) -> Self {
Self { keystore, overseer_handle, our_slot: None }
}
async fn send_subsystem_message(&mut self, message: CollatorProtocolMessage) {
self.overseer_handle.send_msg(message, "BackingGroupConnectionHelper").await;
}
pub async fn update<P>(&mut self, current_slot: Slot, authorities: &[P::Public])
where
P: sp_core::Pair + Send + Sync,
P::Public: Codec,
{
if Some(current_slot) <= self.our_slot {
return;
}
let next_slot = current_slot + 1;
let next_slot_is_ours =
aura_internal::claim_slot::<P>(next_slot, authorities, &self.keystore)
.await
.is_some();
if next_slot_is_ours {
if self.our_slot.is_none() {
tracing::debug!(target: crate::LOG_TARGET, "Our slot {} is next, connecting to backing groups", next_slot);
self.send_subsystem_message(CollatorProtocolMessage::ConnectToBackingGroups)
.await;
}
self.our_slot = Some(next_slot);
} else if self.our_slot.take().is_some() {
tracing::debug!(target: crate::LOG_TARGET, "Current slot = {}, disconnecting from backing groups", current_slot);
self.send_subsystem_message(CollatorProtocolMessage::DisconnectFromBackingGroups)
.await;
}
}
}
async fn check_validation_code_or_log(
local_validation_code_hash: &ValidationCodeHash,
para_id: ParaId,
relay_client: &impl RelayChainInterface,
relay_parent: RelayHash,
) {
let state_validation_code_hash = match relay_client
.validation_code_hash(relay_parent, para_id, OccupiedCoreAssumption::Included)
.await
{
Ok(hash) => hash,
Err(error) => {
tracing::debug!(
target: super::LOG_TARGET,
%error,
?relay_parent,
%para_id,
"Failed to fetch validation code hash",
);
return;
},
};
match state_validation_code_hash {
Some(state) => {
if state != *local_validation_code_hash {
tracing::warn!(
target: super::LOG_TARGET,
%para_id,
?relay_parent,
?local_validation_code_hash,
relay_validation_code_hash = ?state,
"Parachain code doesn't match validation code stored in the relay chain state.",
);
}
},
None => {
tracing::warn!(
target: super::LOG_TARGET,
%para_id,
?relay_parent,
"Could not find validation code for parachain in the relay chain state.",
);
},
}
}
async fn scheduling_lookahead(
relay_parent: RelayHash,
relay_client: &impl RelayChainInterface,
) -> Option<u32> {
let runtime_api_version = relay_client
.version(relay_parent)
.await
.map_err(|e| {
tracing::error!(
target: super::LOG_TARGET,
error = ?e,
"Failed to fetch relay chain runtime version.",
)
})
.ok()?;
let parachain_host_runtime_api_version = runtime_api_version
.api_version(
&<dyn polkadot_primitives::runtime_api::ParachainHost<polkadot_primitives::Block>>::ID,
)
.unwrap_or_default();
if parachain_host_runtime_api_version <
RuntimeApiRequest::SCHEDULING_LOOKAHEAD_RUNTIME_REQUIREMENT
{
return None;
}
match relay_client.scheduling_lookahead(relay_parent).await {
Ok(scheduling_lookahead) => Some(scheduling_lookahead),
Err(err) => {
tracing::error!(
target: crate::LOG_TARGET,
?err,
?relay_parent,
"Failed to fetch scheduling lookahead from relay chain",
);
None
},
}
}
async fn claim_queue_at(
relay_parent: RelayHash,
relay_client: &impl RelayChainInterface,
) -> ClaimQueueSnapshot {
match relay_client.claim_queue(relay_parent).await {
Ok(claim_queue) => claim_queue.into(),
Err(error) => {
tracing::error!(
target: crate::LOG_TARGET,
?error,
?relay_parent,
"Failed to query claim queue runtime API",
);
Default::default()
},
}
}
async fn can_build_upon<Block: BlockT, Client, P>(
para_slot: Slot,
relay_slot: Slot,
timestamp: Timestamp,
parent_hash: Block::Hash,
included_block: Block::Hash,
client: &Client,
keystore: &KeystorePtr,
) -> Option<SlotClaim<P::Public>>
where
Client: ProvideRuntimeApi<Block>,
Client::Api: AuraApi<Block, P::Public> + AuraUnincludedSegmentApi<Block> + ApiExt<Block>,
P: Pair,
P::Public: Codec,
P::Signature: Codec,
{
let mut runtime_api = client.runtime_api();
runtime_api.set_call_context(sp_core::traits::CallContext::Onchain);
let authorities = runtime_api.authorities(parent_hash).ok()?;
let author_pub = aura_internal::claim_slot::<P>(para_slot, &authorities, keystore).await?;
if parent_hash == included_block {
return Some(SlotClaim::unchecked::<P>(author_pub, para_slot, timestamp));
}
let api_version = runtime_api
.api_version::<dyn AuraUnincludedSegmentApi<Block>>(parent_hash)
.ok()
.flatten()?;
let slot = if api_version > 1 { relay_slot } else { para_slot };
runtime_api
.can_build_upon(parent_hash, included_block, slot)
.ok()?
.then(|| SlotClaim::unchecked::<P>(author_pub, para_slot, timestamp))
}
async fn find_parent<Block>(
relay_parent: RelayHash,
para_id: ParaId,
para_backend: &impl sc_client_api::Backend<Block>,
relay_client: &impl RelayChainInterface,
) -> Option<consensus_common::ParentSearchResult<Block>>
where
Block: BlockT,
{
let parent_search_params = ParentSearchParams {
relay_parent,
para_id,
ancestry_lookback: scheduling_lookahead(relay_parent, relay_client)
.await
.unwrap_or(DEFAULT_SCHEDULING_LOOKAHEAD)
.saturating_sub(1) as usize,
};
match cumulus_client_consensus_common::find_parent_for_building::<Block>(
parent_search_params,
para_backend,
relay_client,
)
.await
{
Ok(Some(result)) => Some(result),
Ok(None) => {
tracing::warn!(
target: crate::LOG_TARGET,
?relay_parent,
"Could not find parent to build upon.",
);
None
},
Err(e) => {
tracing::error!(
target: crate::LOG_TARGET,
?relay_parent,
err = ?e,
"Could not find parent to build upon"
);
None
},
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::collators::{can_build_upon, BackingGroupConnectionHelper};
use codec::Encode;
use cumulus_primitives_aura::Slot;
use cumulus_primitives_core::BlockT;
use cumulus_relay_chain_interface::PHash;
use cumulus_test_client::{
runtime::{Block, Hash},
Client, DefaultTestClientBuilderExt, InitBlockBuilder, TestClientBuilder,
TestClientBuilderExt,
};
use cumulus_test_relay_sproof_builder::RelayStateSproofBuilder;
use futures::StreamExt;
use polkadot_overseer::{Event, Handle};
use polkadot_primitives::HeadData;
use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy};
use sp_consensus::BlockOrigin;
use sp_keystore::{Keystore, KeystorePtr};
use sp_timestamp::Timestamp;
use std::sync::{Arc, Mutex};
async fn import_block<I: BlockImport<Block>>(
importer: &I,
block: Block,
origin: BlockOrigin,
import_as_best: bool,
) {
let (header, body) = block.deconstruct();
let mut block_import_params = BlockImportParams::new(origin, header);
block_import_params.fork_choice = Some(ForkChoiceStrategy::Custom(import_as_best));
block_import_params.body = Some(body);
importer.import_block(block_import_params).await.unwrap();
}
fn sproof_with_parent_by_hash(client: &Client, hash: PHash) -> RelayStateSproofBuilder {
let header = client.header(hash).ok().flatten().expect("No header for parent block");
let included = HeadData(header.encode());
let mut builder = RelayStateSproofBuilder::default();
builder.para_id = cumulus_test_client::runtime::PARACHAIN_ID.into();
builder.included_para_head = Some(included);
builder
}
async fn build_and_import_block(client: &Client, included: Hash) -> Block {
let sproof = sproof_with_parent_by_hash(client, included);
let block_builder = client.init_block_builder(None, sproof).block_builder;
let block = block_builder.build().unwrap().block;
let origin = BlockOrigin::NetworkInitialSync;
import_block(client, block.clone(), origin, true).await;
block
}
fn set_up_components(num_authorities: usize) -> (Arc<Client>, KeystorePtr) {
let keystore = Arc::new(sp_keystore::testing::MemoryKeystore::new()) as Arc<_>;
for key in sp_keyring::Sr25519Keyring::iter().take(num_authorities) {
Keystore::sr25519_generate_new(
&*keystore,
sp_application_crypto::key_types::AURA,
Some(&key.to_seed()),
)
.expect("Can insert key into MemoryKeyStore");
}
(Arc::new(TestClientBuilder::new().build()), keystore)
}
#[tokio::test]
async fn test_can_build_upon() {
let (client, keystore) = set_up_components(6);
let genesis_hash = client.chain_info().genesis_hash;
let mut last_hash = genesis_hash;
while can_build_upon::<_, _, sp_consensus_aura::sr25519::AuthorityPair>(
Slot::from(u64::MAX),
Slot::from(u64::MAX),
Timestamp::default(),
last_hash,
genesis_hash,
&*client,
&keystore,
)
.await
.is_some()
{
let block = build_and_import_block(&client, genesis_hash).await;
last_hash = block.header().hash();
}
let result = can_build_upon::<_, _, sp_consensus_aura::sr25519::AuthorityPair>(
Slot::from(u64::MAX),
Slot::from(u64::MAX),
Timestamp::default(),
last_hash,
last_hash,
&*client,
&keystore,
)
.await;
assert!(result.is_some());
}
fn create_overseer_handle() -> (OverseerHandle, Arc<Mutex<Vec<CollatorProtocolMessage>>>) {
let messages = Arc::new(Mutex::new(Vec::new()));
let messages_clone = messages.clone();
let (tx, mut rx) = polkadot_node_subsystem_util::metered::channel(100);
tokio::spawn(async move {
while let Some(event) = rx.next().await {
if let Event::MsgToSubsystem { msg, .. } = event {
if let polkadot_node_subsystem::AllMessages::CollatorProtocol(cp_msg) = msg {
messages_clone.lock().unwrap().push(cp_msg);
}
}
}
});
(Handle::new(tx), messages)
}
#[tokio::test]
async fn preconnect_when_next_slot_is_ours() {
let (client, keystore) = set_up_components(1);
let genesis_hash = client.chain_info().genesis_hash;
let (overseer_handle, messages_recorder) = create_overseer_handle();
let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let messages = messages_recorder.lock().unwrap();
assert_eq!(messages.len(), 1);
assert!(matches!(messages[0], CollatorProtocolMessage::ConnectToBackingGroups));
assert_eq!(helper.our_slot, Some(Slot::from(6)));
}
#[tokio::test]
async fn preconnect_no_duplicate_connect_message() {
let (client, keystore) = set_up_components(1);
let genesis_hash = client.chain_info().genesis_hash;
let (overseer_handle, messages_recorder) = create_overseer_handle();
let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
assert_eq!(messages_recorder.lock().unwrap().len(), 1);
messages_recorder.lock().unwrap().clear();
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
assert_eq!(messages_recorder.lock().unwrap().len(), 0);
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(6), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
assert_eq!(messages_recorder.lock().unwrap().len(), 0);
}
#[tokio::test]
async fn preconnect_disconnect_when_slot_passes() {
let (client, keystore) = set_up_components(1);
let genesis_hash = client.chain_info().genesis_hash;
let (overseer_handle, messages_recorder) = create_overseer_handle();
let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
assert_eq!(helper.our_slot, Some(Slot::from(6)));
messages_recorder.lock().unwrap().clear();
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(8), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
{
let messages = messages_recorder.lock().unwrap();
assert_eq!(messages.len(), 1, "Expected exactly one disconnect message");
assert!(matches!(messages[0], CollatorProtocolMessage::DisconnectFromBackingGroups));
assert_eq!(helper.our_slot, None);
}
messages_recorder.lock().unwrap().clear();
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(8), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let messages = messages_recorder.lock().unwrap();
assert_eq!(messages.len(), 0, "Expected no messages");
assert_eq!(helper.our_slot, None);
}
#[tokio::test]
async fn preconnect_no_disconnect_without_previous_connection() {
let (client, keystore) = set_up_components(1);
let genesis_hash = client.chain_info().genesis_hash;
let (overseer_handle, messages_recorder) = create_overseer_handle();
let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(1), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
assert_eq!(messages_recorder.lock().unwrap().len(), 0);
assert_eq!(helper.our_slot, None);
}
#[tokio::test]
async fn preconnect_multiple_cycles() {
let (client, keystore) = set_up_components(1);
let genesis_hash = client.chain_info().genesis_hash;
let (overseer_handle, messages_recorder) = create_overseer_handle();
let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
let authorities = client.runtime_api().authorities(genesis_hash).unwrap();
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(5), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
{
let messages = messages_recorder.lock().unwrap();
assert_eq!(messages.len(), 1);
assert!(matches!(messages[0], CollatorProtocolMessage::ConnectToBackingGroups));
}
assert_eq!(helper.our_slot, Some(Slot::from(6)));
messages_recorder.lock().unwrap().clear();
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(7), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
{
let messages = messages_recorder.lock().unwrap();
assert_eq!(messages.len(), 1);
assert!(matches!(messages[0], CollatorProtocolMessage::DisconnectFromBackingGroups));
}
assert_eq!(helper.our_slot, None);
messages_recorder.lock().unwrap().clear();
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(11), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
{
let messages = messages_recorder.lock().unwrap();
assert_eq!(messages.len(), 1);
assert!(matches!(messages[0], CollatorProtocolMessage::ConnectToBackingGroups));
}
assert_eq!(helper.our_slot, Some(Slot::from(12)));
}
#[tokio::test]
async fn preconnect_handles_empty_authorities() {
let keystore = Arc::new(sp_keystore::testing::MemoryKeystore::new()) as Arc<_>;
let (overseer_handle, messages_recorder) = create_overseer_handle();
let mut helper = BackingGroupConnectionHelper::new(keystore, overseer_handle);
let authorities = vec![];
helper
.update::<sp_consensus_aura::sr25519::AuthorityPair>(Slot::from(0), &authorities)
.await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
assert_eq!(messages_recorder.lock().unwrap().len(), 0);
}
}
fn get_relay_proof_request<Block, Client>(
client: &Client,
parent_hash: Block::Hash,
) -> RelayProofRequest
where
Block: BlockT,
Client: ProvideRuntimeApi<Block>,
Client::Api: KeyToIncludeInRelayProof<Block>,
{
client.runtime_api().keys_to_prove(parent_hash).unwrap_or_else(|e| {
tracing::debug!(
target: crate::LOG_TARGET,
error = ?e,
"Failed to fetch relay proof requests from runtime, using empty request"
);
Default::default()
})
}
pub struct RelayParentData {
relay_parent: RelayHeader,
descendants: Vec<RelayHeader>,
}
impl RelayParentData {
pub fn new(relay_parent: RelayHeader) -> Self {
Self { relay_parent, descendants: Default::default() }
}
pub fn new_with_descendants(relay_parent: RelayHeader, descendants: Vec<RelayHeader>) -> Self {
Self { relay_parent, descendants }
}
pub fn relay_parent(&self) -> &RelayHeader {
&self.relay_parent
}
#[cfg(test)]
pub fn descendants_len(&self) -> usize {
self.descendants.len()
}
pub fn into_inherent_descendant_list(self) -> Vec<RelayHeader> {
let Self { relay_parent, mut descendants } = self;
if descendants.is_empty() {
return Default::default();
}
let mut result = vec![relay_parent];
result.append(&mut descendants);
result
}
}