use polkadot_sdk::*;
use super::*;
use crate::{
timestamp::SlotTimestampProvider, ChainInfo, ParachainSproofInherentProvider, SimnodeApiServer,
SimnodeRpcHandler,
};
use async_trait::async_trait;
use futures::{channel::mpsc, future::Either, lock::Mutex, FutureExt, StreamExt};
use jsonrpsee::{core::RpcResult, types::ErrorObjectOwned};
use num_traits::AsPrimitive;
use sc_client_api::Backend;
use sc_consensus_manual_seal::{
rpc::{ManualSeal, ManualSealApiServer},
run_manual_seal, EngineCommand, ManualSealParams,
};
use sc_consensus::{BlockImport, ImportQueue};
use sc_network::NetworkBackend;
use sc_service::{
build_network, spawn_tasks, BuildNetworkParams, PartialComponents, SpawnTasksParams,
TFullBackend, TFullClient, TaskManager,
};
use sc_transaction_pool::TransactionPoolWrapper;
use sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool};
use simnode_runtime_api::CreateTransactionApi;
use sp_api::{ApiExt, ConstructRuntimeApi, Core, ProvideRuntimeApi};
use sp_block_builder::BlockBuilder;
use sp_blockchain::HeaderBackend;
use sp_consensus::SelectChain;
use sp_consensus_aura::AuraApi;
use sp_core::{crypto::AccountId32, traits::SpawnEssentialNamed, Bytes};
use sp_runtime::traits::{Block as BlockT, Header};
use sp_transaction_pool::runtime_api::TaggedTransactionQueue;
use std::sync::Arc;
pub struct ParachainRPCHandler<T: ChainInfo> {
inner: SimnodeRpcHandler<T>,
sink: futures::channel::mpsc::Sender<
sc_consensus_manual_seal::EngineCommand<<T::Block as BlockT>::Hash>,
>,
parachain: crate::sproof::SharedParachainSproofInherentProvider<T>,
}
#[async_trait]
impl<T> SimnodeApiServer for ParachainRPCHandler<T>
where
T: ChainInfo + Send + Sync + 'static,
<T::RuntimeApi as ConstructRuntimeApi<T::Block, FullClientFor<T>>>::RuntimeApi:
CreateTransactionApi<
T::Block,
<T::Runtime as frame_system::Config>::RuntimeCall,
<T::Runtime as frame_system::Config>::AccountId,
>,
<T::Runtime as frame_system::Config>::AccountId: From<AccountId32>,
<<T::Block as BlockT>::Header as Header>::Number: num_traits::cast::AsPrimitive<u32>,
T::Runtime: staging_parachain_info::Config,
{
fn author_extrinsic(&self, call: Bytes, account: String) -> RpcResult<Bytes> {
Ok(self.inner.author_extrinsic(call, account)?.into())
}
fn revert_blocks(&self, n: u32) -> RpcResult<()> {
self.inner.revert_blocks(n)
}
async fn upgrade_signal(&self, go_ahead: bool) -> RpcResult<()> {
use futures::{channel::oneshot, SinkExt};
let signal = match go_ahead {
true => polkadot_primitives::UpgradeGoAhead::GoAhead,
false => polkadot_primitives::UpgradeGoAhead::Abort,
};
let para_id = self
.inner
.with_state(None, || staging_parachain_info::Pallet::<T::Runtime>::parachain_id());
let builder = cumulus_test_relay_sproof_builder::RelayStateSproofBuilder {
para_id,
upgrade_go_ahead: Some(signal),
..Default::default()
};
self.parachain.lock().await.update_sproof_builder(builder);
let mut sink = self.sink.clone();
let (sender, receiver) = oneshot::channel();
let command = sc_consensus_manual_seal::EngineCommand::SealNewBlock {
create_empty: true,
finalize: true,
parent_hash: None,
sender: Some(sender),
};
sink.send(command)
.await
.map_err(|_| ErrorObjectOwned::owned::<&str>(2300, "Failed to upgrade signal", None))?;
match receiver.await {
Ok(Ok(_)) => Ok(()),
Ok(Err(e)) => Err(e.into()),
Err(e) => Err(ErrorObjectOwned::owned::<&str>(2200, format!("{}", e), None)),
}
}
}
pub struct ParachainSelectChain<Client> {
client: Arc<Client>,
}
impl<C> Clone for ParachainSelectChain<C> {
fn clone(&self) -> Self {
Self { client: self.client.clone() }
}
}
impl<C> ParachainSelectChain<C> {
pub fn new(client: Arc<C>) -> Self {
Self { client }
}
}
#[async_trait]
impl<B, C> SelectChain<B> for ParachainSelectChain<C>
where
B: BlockT,
C: HeaderBackend<B>,
{
async fn leaves(&self) -> Result<Vec<B::Hash>, sp_consensus::Error> {
Ok(vec![])
}
async fn best_chain(&self) -> Result<B::Header, sp_consensus::Error> {
let header = self
.client
.header(self.client.info().best_hash)
.map_err(|e| sp_consensus::Error::Other(Box::new(e)))?
.ok_or_else(|| sp_consensus::Error::StateUnavailable(format!("Header not found!")))?;
Ok(header)
}
}
pub async fn start_simnode<C, B, S, I, BI, U>(
params: SimnodeParams<
TFullClient<C::Block, C::RuntimeApi, Executor>,
TFullBackend<B>,
S,
TransactionPoolWrapper<B, FullClientFor<C>>,
I,
BI,
U,
>,
) -> Result<TaskManager, sc_service::Error>
where
B: BlockT,
C: ChainInfo<Block = B> + 'static + Send + Sync,
I: ImportQueue<B> + 'static,
BI: BlockImport<B, Error = sp_consensus::Error> + Send + Sync + 'static,
S: SelectChain<B> + 'static,
<C::RuntimeApi as ConstructRuntimeApi<B, FullClientFor<C>>>::RuntimeApi:
Core<B>
+ TaggedTransactionQueue<B>
+ sp_offchain::OffchainWorkerApi<B>
+ sp_api::Metadata<B>
+ sp_session::SessionKeys<B>
+ ApiExt<B>
+ BlockBuilder<B>
+ sp_consensus_aura::AuraApi<B, sp_consensus_aura::sr25519::AuthorityId>
+ CreateTransactionApi<
C::Block,
<C::Runtime as frame_system::Config>::RuntimeCall,
<C::Runtime as frame_system::Config>::AccountId,
>,
<<B as BlockT>::Header as Header>::Number: AsPrimitive<u32>,
<B as BlockT>::Hash: Unpin,
<B as BlockT>::Header: Unpin,
C::Runtime: staging_parachain_info::Config,
<C::Runtime as frame_system::Config>::RuntimeCall: Send + Sync,
<C::Runtime as frame_system::Config>::AccountId: Send + Sync + From<AccountId32>,
{
use sc_consensus_manual_seal::consensus::aura::AuraConsensusDataProvider;
let SimnodeParams { components, config, instant, rpc_builder } = params;
let PartialComponents {
client,
backend,
mut task_manager,
keystore_container,
select_chain,
import_queue,
transaction_pool: pool,
other: (block_import, mut telemetry, _),
} = components;
let slot_duration = client
.runtime_api()
.slot_duration(client.info().best_hash)
.map_err(|err| sc_service::Error::Application(Box::new(err)))?;
let parachain_inherent_provider = Arc::new(Mutex::new(
ParachainSproofInherentProvider::<C>::new(client.clone(), slot_duration.as_millis()),
));
let net_config = sc_network::config::FullNetworkConfiguration::<
B,
B::Hash,
sc_network::Litep2pNetworkBackend,
>::new(&config.network, config.prometheus_registry().cloned());
let metrics = <sc_network::Litep2pNetworkBackend as NetworkBackend<B, B::Hash>>::register_notification_metrics(
config.prometheus_registry(),
);
let (network, system_rpc_tx, tx_handler_controller, sync_service) = {
let params = BuildNetworkParams {
config: &config,
net_config,
client: client.clone(),
transaction_pool: pool.clone(),
spawn_handle: task_manager.spawn_handle(),
import_queue,
block_announce_validator_builder: None,
warp_sync_config: None,
block_relay: None,
metrics,
};
build_network(params)?
};
if config.offchain_worker.enabled {
task_manager.spawn_handle().spawn(
"offchain-workers-runner",
"offchain-worker",
sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
runtime_api_provider: client.clone(),
is_validator: config.role.is_authority(),
keystore: Some(keystore_container.keystore()),
offchain_db: backend.offchain_storage(),
transaction_pool: Some(OffchainTransactionPoolFactory::new(pool.clone())),
network_provider: Arc::new(network.clone()),
enable_http_requests: true,
custom_extensions: |_| vec![],
})?
.run(client.clone(), task_manager.spawn_handle())
.boxed(),
);
}
let env = sc_basic_authorship::ProposerFactory::new(
task_manager.spawn_handle(),
client.clone(),
pool.clone(),
config.prometheus_registry(),
None,
);
let (command_sink, commands_stream) = mpsc::channel(10);
let rpc_sink = command_sink.clone();
let parachain_inherent_provider_clone = parachain_inherent_provider.clone();
let (client_clone, backend_clone) = (client.clone(), backend.clone());
let params = SpawnTasksParams {
config,
client: client.clone(),
backend: backend.clone(),
task_manager: &mut task_manager,
keystore: keystore_container.keystore(),
transaction_pool: pool.clone(),
rpc_builder: Box::new(move |subscription_executor| {
let mut io = rpc_builder(subscription_executor)?;
io.merge(
ParachainRPCHandler {
inner: SimnodeRpcHandler::new(client_clone.clone(), backend_clone.clone()),
sink: rpc_sink.clone(),
parachain: parachain_inherent_provider_clone.clone(),
}
.into_rpc(),
)
.map_err(|_| sc_service::Error::Other("Unable to merge simnode rpc api".to_string()))?;
io.merge(ManualSeal::new(rpc_sink.clone()).into_rpc()).map_err(|_| {
sc_service::Error::Other("Unable to merge manual seal rpc api".to_string())
})?;
Ok(io)
}),
network,
system_rpc_tx,
tx_handler_controller,
sync_service,
telemetry: telemetry.as_mut(),
tracing_execute_block: None,
};
spawn_tasks(params)?;
let task = run_manual_seal(ManualSealParams {
block_import,
env,
client: client.clone(),
pool: pool.clone(),
commands_stream: if instant {
let tx_notifications =
pool.import_notification_stream().map(move |_| EngineCommand::SealNewBlock {
create_empty: true,
finalize: true,
parent_hash: None,
sender: None,
});
Either::Left(futures::stream::select(tx_notifications, commands_stream))
} else {
Either::Right(commands_stream)
},
select_chain,
consensus_data_provider: Some(Box::new(AuraConsensusDataProvider::new(client.clone()))),
create_inherent_data_providers: {
let client = client.clone();
let parachain_inherent_provider = parachain_inherent_provider.clone();
move |parent, _| {
let client = client.clone();
let parachain_sproof = parachain_inherent_provider.clone();
async move {
let client = client.clone();
let parachain_sproof = parachain_sproof.clone();
let timestamp = SlotTimestampProvider::new_aura(client.clone(), parent)
.map_err(|err| format!("{:?}", err))?;
let aura =
sp_consensus_aura::inherents::InherentDataProvider::new(timestamp.slot());
let parachain_system = parachain_sproof
.lock()
.await
.create_inherent(timestamp.slot().into(), parent)?;
Ok((timestamp, aura, parachain_system))
}
}
},
});
task_manager.spawn_essential_handle().spawn_essential(
"manual-consensus-task",
None,
Box::pin(task),
);
Ok(task_manager)
}