#![deny(
bad_style,
const_err,
improper_ctypes,
missing_docs,
non_shorthand_field_patterns,
no_mangle_generic_items,
overflowing_literals,
path_statements,
patterns_in_fns_without_body,
private_in_public,
unconditional_recursion,
unused_allocation,
unused_comparisons,
unused_parens,
while_true,
trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
clippy::all
)]
#![allow(clippy::type_complexity)]
#[macro_use]
extern crate substrate_subxt_proc_macro;
pub use sp_core;
pub use sp_runtime;
use codec::{
Codec,
Decode,
};
use futures::future;
use jsonrpsee_http_client::{
HttpClient,
HttpConfig,
};
use jsonrpsee_ws_client::{
WsClient,
WsConfig,
WsSubscription as Subscription,
};
use sp_core::{
storage::{
StorageChangeSet,
StorageData,
StorageKey,
},
Bytes,
};
pub use sp_runtime::traits::SignedExtension;
pub use sp_version::RuntimeVersion;
use std::{
marker::PhantomData,
sync::Arc,
};
mod error;
mod events;
pub mod extrinsic;
mod frame;
mod metadata;
mod rpc;
mod runtimes;
mod subscription;
#[cfg(test)]
mod tests;
pub use crate::{
error::{
Error,
ModuleError,
RuntimeError,
},
events::{
EventTypeRegistry,
EventsDecoder,
RawEvent,
},
extrinsic::{
PairSigner,
SignedExtra,
Signer,
UncheckedExtrinsic,
},
frame::*,
metadata::{
Metadata,
MetadataError,
},
rpc::{
BlockNumber,
ExtrinsicSuccess,
ReadProof,
RpcClient,
SystemProperties,
},
runtimes::*,
subscription::{
EventStorageSubscription,
EventSubscription,
FinalizedEventStorageSubscription,
},
substrate_subxt_proc_macro::*,
};
use crate::{
frame::system::{
AccountStoreExt,
Phase,
System,
},
rpc::{
ChainBlock,
Rpc,
},
};
#[derive(Default)]
pub struct ClientBuilder<T: Runtime> {
url: Option<String>,
client: Option<RpcClient>,
page_size: Option<u32>,
event_type_registry: EventTypeRegistry<T>,
skip_type_sizes_check: bool,
accept_weak_inclusion: bool,
}
impl<T: Runtime> ClientBuilder<T> {
pub fn new() -> Self {
Self {
url: None,
client: None,
page_size: None,
event_type_registry: EventTypeRegistry::new(),
skip_type_sizes_check: false,
accept_weak_inclusion: false,
}
}
pub fn set_client<C: Into<RpcClient>>(mut self, client: C) -> Self {
self.client = Some(client.into());
self
}
pub fn set_url<P: Into<String>>(mut self, url: P) -> Self {
self.url = Some(url.into());
self
}
pub fn set_page_size(mut self, size: u32) -> Self {
self.page_size = Some(size);
self
}
pub fn register_type_size<U>(mut self, name: &str) -> Self
where
U: Codec + Send + Sync + 'static,
{
self.event_type_registry.register_type_size::<U>(name);
self
}
pub fn skip_type_sizes_check(mut self) -> Self {
self.skip_type_sizes_check = true;
self
}
pub fn accept_weak_inclusion(mut self) -> Self {
self.accept_weak_inclusion = true;
self
}
pub async fn build<'a>(self) -> Result<Client<T>, Error> {
let client = if let Some(client) = self.client {
client
} else {
let url = self.url.as_deref().unwrap_or("ws://127.0.0.1:9944");
if url.starts_with("ws://") || url.starts_with("wss://") {
let mut config = WsConfig::with_url(&url);
config.max_notifs_per_subscription = 4096;
RpcClient::WebSocket(Arc::new(WsClient::new(config).await?))
} else {
let client = HttpClient::new(url, HttpConfig::default())?;
RpcClient::Http(Arc::new(client))
}
};
let mut rpc = Rpc::new(client);
if self.accept_weak_inclusion {
rpc.accept_weak_inclusion();
}
let (metadata, genesis_hash, runtime_version, properties) = future::join4(
rpc.metadata(),
rpc.genesis_hash(),
rpc.runtime_version(None),
rpc.system_properties(),
)
.await;
let metadata = metadata?;
if let Err(missing) = self.event_type_registry.check_missing_type_sizes(&metadata)
{
if self.skip_type_sizes_check {
log::warn!(
"The following types do not have registered type segmenters: {:?} \
If any events containing these types are received, this can cause a \
`TypeSizeUnavailable` error and prevent decoding the actual event \
being listened for.\
\
Use `ClientBuilder::register_type_size` to register missing type sizes.",
missing
);
} else {
return Err(Error::MissingTypeSizes(missing.into_iter().collect()))
}
}
let events_decoder =
EventsDecoder::new(metadata.clone(), self.event_type_registry);
Ok(Client {
rpc,
genesis_hash: genesis_hash?,
metadata,
events_decoder,
properties: properties.unwrap_or_else(|_| Default::default()),
runtime_version: runtime_version?,
_marker: PhantomData,
page_size: self.page_size.unwrap_or(10),
})
}
}
pub struct Client<T: Runtime> {
rpc: Rpc<T>,
genesis_hash: T::Hash,
metadata: Metadata,
events_decoder: EventsDecoder<T>,
properties: SystemProperties,
runtime_version: RuntimeVersion,
_marker: PhantomData<(fn() -> T::Signature, T::Extra)>,
page_size: u32,
}
impl<T: Runtime> Clone for Client<T> {
fn clone(&self) -> Self {
Self {
rpc: self.rpc.clone(),
genesis_hash: self.genesis_hash,
metadata: self.metadata.clone(),
events_decoder: self.events_decoder.clone(),
properties: self.properties.clone(),
runtime_version: self.runtime_version.clone(),
_marker: PhantomData,
page_size: self.page_size,
}
}
}
pub struct KeyIter<T: Runtime, F: Store<T>> {
client: Client<T>,
_marker: PhantomData<F>,
count: u32,
hash: T::Hash,
start_key: Option<StorageKey>,
buffer: Vec<(StorageKey, StorageData)>,
}
impl<T: Runtime, F: Store<T>> KeyIter<T, F> {
pub async fn next(&mut self) -> Result<Option<(StorageKey, F::Returns)>, Error> {
loop {
if let Some((k, v)) = self.buffer.pop() {
return Ok(Some((k, Decode::decode(&mut &v.0[..])?)))
} else {
let keys = self
.client
.fetch_keys::<F>(self.count, self.start_key.take(), Some(self.hash))
.await?;
if keys.is_empty() {
return Ok(None)
}
self.start_key = keys.last().cloned();
let change_sets = self
.client
.rpc
.query_storage_at(&keys, Some(self.hash))
.await?;
for change_set in change_sets {
for (k, v) in change_set.changes {
if let Some(v) = v {
self.buffer.push((k, v));
}
}
}
debug_assert_eq!(self.buffer.len(), keys.len());
}
}
}
}
impl<T: Runtime> Client<T> {
pub fn genesis(&self) -> &T::Hash {
&self.genesis_hash
}
pub fn metadata(&self) -> &Metadata {
&self.metadata
}
pub fn properties(&self) -> &SystemProperties {
&self.properties
}
pub async fn fetch_unhashed<V: Decode>(
&self,
key: StorageKey,
hash: Option<T::Hash>,
) -> Result<Option<V>, Error> {
if let Some(data) = self.rpc.storage(&key, hash).await? {
Ok(Some(Decode::decode(&mut &data.0[..])?))
} else {
Ok(None)
}
}
pub async fn fetch<F: Store<T>>(
&self,
store: &F,
hash: Option<T::Hash>,
) -> Result<Option<F::Returns>, Error> {
let key = store.key(&self.metadata)?;
self.fetch_unhashed::<F::Returns>(key, hash).await
}
pub async fn fetch_or_default<F: Store<T>>(
&self,
store: &F,
hash: Option<T::Hash>,
) -> Result<F::Returns, Error> {
if let Some(data) = self.fetch(store, hash).await? {
Ok(data)
} else {
Ok(store.default(&self.metadata)?)
}
}
pub async fn iter<F: Store<T>>(
&self,
hash: Option<T::Hash>,
) -> Result<KeyIter<T, F>, Error> {
let hash = if let Some(hash) = hash {
hash
} else {
self.block_hash(None)
.await?
.expect("didn't pass a block number; qed")
};
Ok(KeyIter {
client: self.clone(),
hash,
count: self.page_size,
start_key: None,
buffer: Default::default(),
_marker: PhantomData,
})
}
pub async fn fetch_keys<F: Store<T>>(
&self,
count: u32,
start_key: Option<StorageKey>,
hash: Option<T::Hash>,
) -> Result<Vec<StorageKey>, Error> {
let prefix = <F as Store<T>>::prefix(&self.metadata)?;
let keys = self
.rpc
.storage_keys_paged(Some(prefix), count, start_key, hash)
.await?;
Ok(keys)
}
pub async fn query_storage(
&self,
keys: Vec<StorageKey>,
from: T::Hash,
to: Option<T::Hash>,
) -> Result<Vec<StorageChangeSet<<T as System>::Hash>>, Error> {
self.rpc.query_storage(keys, from, to).await
}
pub async fn header<H>(&self, hash: Option<H>) -> Result<Option<T::Header>, Error>
where
H: Into<T::Hash> + 'static,
{
let header = self.rpc.header(hash.map(|h| h.into())).await?;
Ok(header)
}
pub async fn block_hash(
&self,
block_number: Option<BlockNumber>,
) -> Result<Option<T::Hash>, Error> {
let hash = self.rpc.block_hash(block_number).await?;
Ok(hash)
}
pub async fn finalized_head(&self) -> Result<T::Hash, Error> {
let head = self.rpc.finalized_head().await?;
Ok(head)
}
pub async fn block<H>(&self, hash: Option<H>) -> Result<Option<ChainBlock<T>>, Error>
where
H: Into<T::Hash> + 'static,
{
let block = self.rpc.block(hash.map(|h| h.into())).await?;
Ok(block)
}
pub async fn read_proof<H>(
&self,
keys: Vec<StorageKey>,
hash: Option<H>,
) -> Result<ReadProof<T::Hash>, Error>
where
H: Into<T::Hash> + 'static,
{
let proof = self.rpc.read_proof(keys, hash.map(|h| h.into())).await?;
Ok(proof)
}
pub async fn subscribe_events(&self) -> Result<EventStorageSubscription<T>, Error> {
let events = self.rpc.subscribe_events().await?;
Ok(events)
}
pub async fn subscribe_finalized_events(
&self,
) -> Result<EventStorageSubscription<T>, Error> {
let events = self.rpc.subscribe_finalized_events().await?;
Ok(events)
}
pub async fn subscribe_blocks(&self) -> Result<Subscription<T::Header>, Error> {
let headers = self.rpc.subscribe_blocks().await?;
Ok(headers)
}
pub async fn subscribe_finalized_blocks(
&self,
) -> Result<Subscription<T::Header>, Error> {
let headers = self.rpc.subscribe_finalized_blocks().await?;
Ok(headers)
}
pub fn encode<C: Call<T>>(&self, call: C) -> Result<Encoded, Error> {
Ok(self
.metadata()
.module_with_calls(C::MODULE)
.and_then(|module| module.call(C::FUNCTION, call))?)
}
pub fn create_unsigned<C: Call<T> + Send + Sync>(
&self,
call: C,
) -> Result<UncheckedExtrinsic<T>, Error> {
let call = self.encode(call)?;
Ok(extrinsic::create_unsigned::<T>(call))
}
pub async fn create_signed<C: Call<T> + Send + Sync>(
&self,
call: C,
signer: &(dyn Signer<T> + Send + Sync),
) -> Result<UncheckedExtrinsic<T>, Error>
where
<<T::Extra as SignedExtra<T>>::Extra as SignedExtension>::AdditionalSigned:
Send + Sync,
{
let account_nonce = if let Some(nonce) = signer.nonce() {
nonce
} else {
self.account(signer.account_id(), None).await?.nonce
};
let call = self.encode(call)?;
let signed = extrinsic::create_signed(
&self.runtime_version,
self.genesis_hash,
account_nonce,
call,
signer,
)
.await?;
Ok(signed)
}
pub fn events_decoder(&self) -> &EventsDecoder<T> {
&self.events_decoder
}
pub async fn submit_extrinsic(
&self,
extrinsic: UncheckedExtrinsic<T>,
) -> Result<T::Hash, Error> {
self.rpc.submit_extrinsic(extrinsic).await
}
pub async fn submit_and_watch_extrinsic(
&self,
extrinsic: UncheckedExtrinsic<T>,
) -> Result<ExtrinsicSuccess<T>, Error> {
self.rpc
.submit_and_watch_extrinsic(extrinsic, &self.events_decoder)
.await
}
pub async fn submit<C: Call<T> + Send + Sync>(
&self,
call: C,
signer: &(dyn Signer<T> + Send + Sync),
) -> Result<T::Hash, Error>
where
<<T::Extra as SignedExtra<T>>::Extra as SignedExtension>::AdditionalSigned:
Send + Sync,
{
let extrinsic = self.create_signed(call, signer).await?;
self.submit_extrinsic(extrinsic).await
}
pub async fn watch<C: Call<T> + Send + Sync>(
&self,
call: C,
signer: &(dyn Signer<T> + Send + Sync),
) -> Result<ExtrinsicSuccess<T>, Error>
where
<<T::Extra as SignedExtra<T>>::Extra as SignedExtension>::AdditionalSigned:
Send + Sync,
{
let extrinsic = self.create_signed(call, signer).await?;
self.submit_and_watch_extrinsic(extrinsic).await
}
pub async fn insert_key(
&self,
key_type: String,
suri: String,
public: Bytes,
) -> Result<(), Error> {
self.rpc.insert_key(key_type, suri, public).await
}
pub async fn rotate_keys(&self) -> Result<Bytes, Error> {
self.rpc.rotate_keys().await
}
pub async fn has_session_keys(&self, session_keys: Bytes) -> Result<bool, Error> {
self.rpc.has_session_keys(session_keys).await
}
pub async fn has_key(
&self,
public_key: Bytes,
key_type: String,
) -> Result<bool, Error> {
self.rpc.has_key(public_key, key_type).await
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct Encoded(pub Vec<u8>);
impl codec::Encode for Encoded {
fn encode(&self) -> Vec<u8> {
self.0.to_owned()
}
}