#![allow(irrefutable_let_patterns)]
use std::convert::TryInto;
use codec::{
Decode,
Encode,
Error as CodecError,
};
use jsonrpsee::{
client::Subscription,
common::{
to_value as to_json_value,
Params,
},
Client,
};
use num_traits::bounds::Bounded;
use frame_metadata::RuntimeMetadataPrefixed;
use sp_core::{
storage::{
StorageChangeSet,
StorageData,
StorageKey,
},
twox_128,
Bytes,
};
use sp_rpc::{
list::ListOrValue,
number::NumberOrHex,
};
use sp_runtime::{
generic::{
Block,
SignedBlock,
},
traits::Hash,
};
use sp_transaction_pool::TransactionStatus;
use sp_version::RuntimeVersion;
use std::marker::PhantomData;
use crate::{
error::Error,
events::{
EventsDecoder,
RawEvent,
RuntimeEvent,
},
frame::{
balances::Balances,
system::{
Phase,
System,
SystemEvent,
},
},
metadata::Metadata,
};
pub type ChainBlock<T> =
SignedBlock<Block<<T as System>::Header, <T as System>::Extrinsic>>;
pub type BlockNumber<T> = NumberOrHex<<T as System>::BlockNumber>;
#[derive(Clone)]
pub struct Rpc<T: System> {
client: Client,
marker: std::marker::PhantomData<T>,
}
impl<T> Rpc<T>
where
T: System,
{
pub async fn connect_ws(url: &str) -> Result<Self, Error> {
let client = jsonrpsee::ws_client(&url).await?;
Ok(Rpc {
client: client.into(),
marker: PhantomData,
})
}
pub async fn storage<V: Decode>(
&self,
key: StorageKey,
hash: Option<T::Hash>,
) -> Result<Option<V>, Error> {
let params = Params::Array(vec![to_json_value(key)?, to_json_value(hash)?]);
let data: Option<StorageData> =
self.client.request("state_getStorage", params).await?;
match data {
Some(data) => {
let value = Decode::decode(&mut &data.0[..])?;
Ok(Some(value))
}
None => Ok(None),
}
}
pub async fn query_storage(
&self,
keys: Vec<StorageKey>,
from: T::Hash,
to: Option<T::Hash>,
) -> Result<Vec<StorageChangeSet<<T as System>::Hash>>, Error> {
let params = Params::Array(vec![
to_json_value(keys)?,
to_json_value(from)?,
to_json_value(to)?,
]);
self.client
.request("state_queryStorage", params)
.await
.map_err(Into::into)
}
pub async fn genesis_hash(&self) -> Result<T::Hash, Error> {
let block_zero = Some(ListOrValue::Value(NumberOrHex::Number(
T::BlockNumber::min_value(),
)));
let params = Params::Array(vec![to_json_value(block_zero)?]);
let list_or_value: ListOrValue<Option<T::Hash>> =
self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
ListOrValue::Value(genesis_hash) => {
genesis_hash.ok_or_else(|| "Genesis hash not found".into())
}
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
}
}
pub async fn metadata(&self) -> Result<Metadata, Error> {
let bytes: Bytes = self
.client
.request("state_getMetadata", Params::None)
.await?;
let meta: RuntimeMetadataPrefixed = Decode::decode(&mut &bytes[..])?;
let metadata: Metadata = meta.try_into()?;
Ok(metadata)
}
pub async fn header(
&self,
hash: Option<T::Hash>,
) -> Result<Option<T::Header>, Error> {
let params = Params::Array(vec![to_json_value(hash)?]);
let header = self.client.request("chain_getHeader", params).await?;
Ok(header)
}
pub async fn block_hash(
&self,
block_number: Option<BlockNumber<T>>,
) -> Result<Option<T::Hash>, Error> {
let block_number = block_number.map(|bn| ListOrValue::Value(bn));
let params = Params::Array(vec![to_json_value(block_number)?]);
let list_or_value = self.client.request("chain_getBlockHash", params).await?;
match list_or_value {
ListOrValue::Value(hash) => Ok(hash),
ListOrValue::List(_) => Err("Expected a Value, got a List".into()),
}
}
pub async fn finalized_head(&self) -> Result<T::Hash, Error> {
let hash = self
.client
.request("chain_getFinalizedHead", Params::None)
.await?;
Ok(hash)
}
pub async fn block(
&self,
hash: Option<T::Hash>,
) -> Result<Option<ChainBlock<T>>, Error> {
let params = Params::Array(vec![to_json_value(hash)?]);
let block = self.client.request("chain_getBlock", params).await?;
Ok(block)
}
pub async fn runtime_version(
&self,
at: Option<T::Hash>,
) -> Result<RuntimeVersion, Error> {
let params = Params::Array(vec![to_json_value(at)?]);
let version = self
.client
.request("state_getRuntimeVersion", params)
.await?;
Ok(version)
}
}
impl<T: System + Balances + 'static> Rpc<T> {
pub async fn subscribe_events(
&self,
) -> Result<Subscription<StorageChangeSet<<T as System>::Hash>>, Error> {
let mut storage_key = twox_128(b"System").to_vec();
storage_key.extend(twox_128(b"Events").to_vec());
log::debug!("Events storage key {:?}", hex::encode(&storage_key));
let keys = Some(vec![StorageKey(storage_key)]);
let params = Params::Array(vec![to_json_value(keys)?]);
let subscription = self
.client
.subscribe("state_subscribeStorage", params, "state_unsubscribeStorage")
.await?;
Ok(subscription)
}
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, Error> {
let subscription = self
.client
.subscribe(
"chain_subscribeNewHeads",
Params::None,
"chain_subscribeNewHeads",
)
.await?;
Ok(subscription)
}
pub async fn subscribe_finalized_blocks(
&self,
) -> Result<Subscription<T::Header>, Error> {
let subscription = self
.client
.subscribe(
"chain_subscribeFinalizedHeads",
Params::None,
"chain_subscribeFinalizedHeads",
)
.await?;
Ok(subscription)
}
pub async fn submit_extrinsic<E: Encode>(
&self,
extrinsic: E,
) -> Result<T::Hash, Error> {
let bytes: Bytes = extrinsic.encode().into();
let params = Params::Array(vec![to_json_value(bytes)?]);
let xt_hash = self
.client
.request("author_submitExtrinsic", params)
.await?;
Ok(xt_hash)
}
pub async fn watch_extrinsic<E: Encode>(
&self,
extrinsic: E,
) -> Result<Subscription<TransactionStatus<T::Hash, T::Hash>>, Error> {
let bytes: Bytes = extrinsic.encode().into();
let params = Params::Array(vec![to_json_value(bytes)?]);
let subscription = self
.client
.subscribe(
"author_submitAndWatchExtrinsic",
params,
"author_unwatchExtrinsic",
)
.await?;
Ok(subscription)
}
pub async fn submit_and_watch_extrinsic<E: Encode + 'static>(
self,
extrinsic: E,
decoder: EventsDecoder<T>,
) -> Result<ExtrinsicSuccess<T>, Error> {
let ext_hash = T::Hashing::hash_of(&extrinsic);
log::info!("Submitting Extrinsic `{:?}`", ext_hash);
let events_sub = self.subscribe_events().await?;
let mut xt_sub = self.watch_extrinsic(extrinsic).await?;
while let status = xt_sub.next().await {
log::info!("received status {:?}", status);
match status {
TransactionStatus::Future
| TransactionStatus::Ready
| TransactionStatus::Broadcast(_) => continue,
TransactionStatus::InBlock(block_hash) => {
log::info!("Fetching block {:?}", block_hash);
let block = self.block(Some(block_hash)).await?;
return match block {
Some(signed_block) => {
log::info!(
"Found block {:?}, with {} extrinsics",
block_hash,
signed_block.block.extrinsics.len()
);
wait_for_block_events(
decoder,
ext_hash,
signed_block,
block_hash,
events_sub,
)
.await
}
None => {
Err(format!("Failed to find block {:?}", block_hash).into())
}
}
}
TransactionStatus::Invalid => return Err("Extrinsic Invalid".into()),
TransactionStatus::Usurped(_) => return Err("Extrinsic Usurped".into()),
TransactionStatus::Dropped => return Err("Extrinsic Dropped".into()),
TransactionStatus::Retracted(_) => {
return Err("Extrinsic Retracted".into())
}
TransactionStatus::Finalized(_) => {
return Err("Extrinsic Finalized".into())
}
TransactionStatus::FinalityTimeout(_) => {
return Err("Extrinsic FinalityTimeout".into())
}
}
}
unreachable!()
}
}
#[derive(Debug)]
pub struct ExtrinsicSuccess<T: System> {
pub block: T::Hash,
pub extrinsic: T::Hash,
pub events: Vec<RuntimeEvent<T>>,
}
impl<T: System> ExtrinsicSuccess<T> {
pub fn find_event_raw(&self, module: &str, variant: &str) -> Option<&RawEvent> {
self.events.iter().find_map(|evt| {
match evt {
RuntimeEvent::Raw(ref raw)
if raw.module == module && raw.variant == variant =>
{
Some(raw)
}
_ => None,
}
})
}
pub fn system_events(&self) -> Vec<&SystemEvent<T>> {
self.events
.iter()
.filter_map(|evt| {
match evt {
RuntimeEvent::System(evt) => Some(evt),
_ => None,
}
})
.collect()
}
pub fn find_event<E: Decode>(
&self,
module: &str,
variant: &str,
) -> Option<Result<E, CodecError>> {
self.find_event_raw(module, variant)
.map(|evt| E::decode(&mut &evt.data[..]))
}
}
pub async fn wait_for_block_events<T: System + Balances + 'static>(
decoder: EventsDecoder<T>,
ext_hash: T::Hash,
signed_block: ChainBlock<T>,
block_hash: T::Hash,
events_subscription: Subscription<StorageChangeSet<T::Hash>>,
) -> Result<ExtrinsicSuccess<T>, Error> {
let ext_index = signed_block
.block
.extrinsics
.iter()
.position(|ext| {
let hash = T::Hashing::hash_of(ext);
hash == ext_hash
})
.ok_or_else(|| {
Error::Other(format!("Failed to find Extrinsic with hash {:?}", ext_hash))
})?;
let mut subscription = events_subscription;
while let change_set = subscription.next().await {
if change_set.block != block_hash {
continue
}
let mut events = Vec::new();
for (_key, data) in change_set.changes {
if let Some(data) = data {
match decoder.decode_events(&mut &data.0[..]) {
Ok(raw_events) => {
for (phase, event) in raw_events {
if let Phase::ApplyExtrinsic(i) = phase {
if i as usize == ext_index {
events.push(event)
}
}
}
}
Err(err) => return Err(err.into()),
}
}
}
return if events.len() > 0 {
Ok(ExtrinsicSuccess {
block: block_hash,
extrinsic: ext_hash,
events,
})
} else {
Err(format!("No events found for block {}", block_hash).into())
}
}
unreachable!()
}