mod block_number_or_ref;
mod blocks;
use super::ClientAtBlock;
use super::OfflineClientAtBlockT;
use crate::backend::{Backend, BlockRef};
use crate::config::{Config, HashFor, Hasher, Header};
use crate::error::OnlineClientError;
use crate::error::{BlocksError, OnlineClientAtBlockError};
use crate::metadata::{ArcMetadata, Metadata};
use crate::transactions::TransactionsClient;
use codec::{Compact, Decode, Encode};
use core::marker::PhantomData;
use frame_decode::helpers::ToTypeRegistry;
use frame_metadata::{RuntimeMetadata, RuntimeMetadataPrefixed};
use scale_info_legacy::TypeRegistrySet;
use std::sync::Arc;
pub use block_number_or_ref::BlockNumberOrRef;
pub use blocks::{Block, Blocks};
#[derive(Clone, Debug)]
pub struct OnlineClient<T: Config> {
inner: Arc<OnlineClientInner<T>>,
}
struct OnlineClientInner<T: Config> {
config: T,
genesis_hash: HashFor<T>,
backend: Arc<dyn Backend<T>>,
}
impl<T: Config> std::fmt::Debug for OnlineClientInner<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OnlineClientInner")
.field("config", &"<config>")
.field("backend", &"Arc<backend impl>")
.finish()
}
}
impl<T: Config> OnlineClient<T> {
#[cfg(all(feature = "jsonrpsee", feature = "runtime"))]
pub async fn new() -> Result<OnlineClient<T>, OnlineClientError>
where
T: Default,
{
OnlineClient::new_with_config(Default::default()).await
}
#[cfg(all(feature = "jsonrpsee", feature = "runtime"))]
pub async fn new_with_config(config: T) -> Result<OnlineClient<T>, OnlineClientError> {
let url = "ws://127.0.0.1:9944";
OnlineClient::from_url_with_config(config, url).await
}
#[cfg(all(feature = "jsonrpsee", feature = "runtime"))]
pub async fn from_url(url: impl AsRef<str>) -> Result<OnlineClient<T>, OnlineClientError>
where
T: Default,
{
OnlineClient::from_url_with_config(Default::default(), url).await
}
#[cfg(all(feature = "jsonrpsee", feature = "runtime"))]
pub async fn from_url_with_config(
config: T,
url: impl AsRef<str>,
) -> Result<OnlineClient<T>, OnlineClientError> {
let url_str = url.as_ref();
if !subxt_rpcs::utils::url_is_secure(url_str).map_err(OnlineClientError::RpcError)? {
return Err(OnlineClientError::RpcError(subxt_rpcs::Error::InsecureUrl(
url_str.to_string(),
)));
}
OnlineClient::from_insecure_url_with_config(config, url).await
}
#[cfg(all(feature = "jsonrpsee", feature = "runtime"))]
pub async fn from_insecure_url(
url: impl AsRef<str>,
) -> Result<OnlineClient<T>, OnlineClientError>
where
T: Default,
{
OnlineClient::from_insecure_url_with_config(Default::default(), url).await
}
#[cfg(all(feature = "jsonrpsee", feature = "runtime"))]
pub async fn from_insecure_url_with_config(
config: T,
url: impl AsRef<str>,
) -> Result<OnlineClient<T>, OnlineClientError> {
let rpc_client = subxt_rpcs::RpcClient::from_insecure_url(url).await?;
OnlineClient::from_rpc_client_with_config(config, rpc_client).await
}
#[cfg(all(feature = "jsonrpsee", feature = "runtime"))]
pub async fn from_rpc_client(
rpc_client: impl Into<subxt_rpcs::RpcClient>,
) -> Result<OnlineClient<T>, OnlineClientError>
where
T: Default,
{
OnlineClient::from_rpc_client_with_config(Default::default(), rpc_client).await
}
#[cfg(all(feature = "jsonrpsee", feature = "runtime"))]
pub async fn from_rpc_client_with_config(
config: T,
rpc_client: impl Into<subxt_rpcs::RpcClient>,
) -> Result<OnlineClient<T>, OnlineClientError> {
let rpc_client = rpc_client.into();
let backend = crate::backend::CombinedBackend::builder()
.build_with_background_driver(rpc_client)
.await
.map_err(OnlineClientError::CannotBuildCombinedBackend)?;
OnlineClient::from_backend_with_config(config, Arc::new(backend)).await
}
pub async fn from_backend<B: Backend<T>>(
backend: Arc<B>,
) -> Result<OnlineClient<T>, OnlineClientError>
where
T: Default,
{
OnlineClient::from_backend_with_config(Default::default(), backend).await
}
pub async fn from_backend_with_config<B: Backend<T>>(
config: T,
backend: Arc<B>,
) -> Result<OnlineClient<T>, OnlineClientError> {
let genesis_hash = match config.genesis_hash() {
Some(hash) => hash,
None => backend
.genesis_hash()
.await
.map_err(OnlineClientError::CannotGetGenesisHash)?,
};
Ok(OnlineClient {
inner: Arc::new(OnlineClientInner {
config,
genesis_hash,
backend,
}),
})
}
pub fn genesis_hash(&self) -> HashFor<T> {
self.inner.genesis_hash
}
pub async fn transactions(
&self,
) -> Result<TransactionsClient<T, OnlineClientAtBlockImpl<T>>, OnlineClientAtBlockError> {
let at_block = self.at_current_block().await?;
Ok(at_block.transactions())
}
pub async fn tx(
&self,
) -> Result<TransactionsClient<T, OnlineClientAtBlockImpl<T>>, OnlineClientAtBlockError> {
self.transactions().await
}
pub async fn stream_all_blocks(&self) -> Result<Blocks<T>, BlocksError> {
let current_block = self
.at_current_block()
.await
.map_err(BlocksError::CannotGetCurrentBlock)?;
let hasher = current_block.client.hasher.clone();
let stream = self
.inner
.backend
.stream_all_block_headers(hasher)
.await
.map_err(BlocksError::CannotGetBlockHeaderStream)?;
Ok(Blocks::from_headers_stream(self.clone(), stream))
}
pub async fn stream_best_blocks(&self) -> Result<Blocks<T>, BlocksError> {
let current_block = self
.at_current_block()
.await
.map_err(BlocksError::CannotGetCurrentBlock)?;
let hasher = current_block.client.hasher.clone();
let stream = self
.inner
.backend
.stream_best_block_headers(hasher)
.await
.map_err(BlocksError::CannotGetBlockHeaderStream)?;
Ok(Blocks::from_headers_stream(self.clone(), stream))
}
pub async fn stream_blocks(&self) -> Result<Blocks<T>, BlocksError> {
let current_block = self
.at_current_block()
.await
.map_err(BlocksError::CannotGetCurrentBlock)?;
let hasher = current_block.client.hasher.clone();
let stream = self
.inner
.backend
.stream_finalized_block_headers(hasher)
.await
.map_err(BlocksError::CannotGetBlockHeaderStream)?;
Ok(Blocks::from_headers_stream(self.clone(), stream))
}
pub async fn at_current_block(
&self,
) -> Result<ClientAtBlock<T, OnlineClientAtBlockImpl<T>>, OnlineClientAtBlockError> {
let latest_block = self
.inner
.backend
.latest_finalized_block_ref()
.await
.map_err(|e| OnlineClientAtBlockError::CannotGetCurrentBlock { reason: e })?;
self.at_block(latest_block).await
}
pub async fn at_block(
&self,
number_or_hash: impl Into<BlockNumberOrRef<T>>,
) -> Result<ClientAtBlock<T, OnlineClientAtBlockImpl<T>>, OnlineClientAtBlockError> {
let number_or_hash = number_or_hash.into();
let (block_ref, block_number) = match number_or_hash {
BlockNumberOrRef::BlockRef(block_ref) => {
let block_hash = block_ref.hash();
let block_header = self
.inner
.backend
.block_header(block_hash)
.await
.map_err(|e| OnlineClientAtBlockError::CannotGetBlockHeader {
block_hash: block_hash.into(),
reason: e,
})?
.ok_or(OnlineClientAtBlockError::BlockHeaderNotFound {
block_hash: block_hash.into(),
})?;
(block_ref, block_header.number())
}
BlockNumberOrRef::Number(block_number) => {
let block_ref = self
.inner
.backend
.block_number_to_hash(block_number)
.await
.map_err(|e| OnlineClientAtBlockError::CannotGetBlockHash {
block_number,
reason: e,
})?
.ok_or(OnlineClientAtBlockError::BlockNotFound { block_number })?;
(block_ref, block_number)
}
};
self.at_block_hash_and_number(block_ref, block_number).await
}
pub async fn at_block_hash_and_number(
&self,
block_ref: impl Into<BlockRef<HashFor<T>>>,
block_number: u64,
) -> Result<ClientAtBlock<T, OnlineClientAtBlockImpl<T>>, OnlineClientAtBlockError> {
let block_ref = block_ref.into();
let block_hash = block_ref.hash();
let (spec_version, transaction_version) = match self
.inner
.config
.spec_and_transaction_version_for_block_number(block_number)
{
Some(version) => version,
None => {
let spec_version_bytes = self
.inner
.backend
.call("Core_version", None, block_hash)
.await
.map_err(|e| OnlineClientAtBlockError::CannotGetSpecVersion {
block_hash: block_hash.into(),
reason: e,
})?;
#[derive(codec::Decode, Debug)]
struct SpecVersionHeader {
_spec_name: String,
_impl_name: String,
_authoring_version: u32,
spec_version: u32,
_impl_version: u32,
_apis: Vec<([u8; 8], u32)>,
transaction_version: u32,
}
#[derive(codec::Decode, Debug)]
struct OldSpecVersionHeader {
_spec_name: String,
_impl_name: String,
_authoring_version: u32,
spec_version: u32,
_impl_version: u32,
_apis: Vec<([u8; 8], u32)>,
}
SpecVersionHeader::decode(&mut &spec_version_bytes[..])
.map(|version| (version.spec_version, version.transaction_version))
.or_else(|_| {
OldSpecVersionHeader::decode(&mut &spec_version_bytes[..])
.map(|version| (version.spec_version, 0))
})
.map_err(|e| OnlineClientAtBlockError::CannotDecodeSpecVersion {
block_hash: block_hash.into(),
reason: e,
})?
}
};
let metadata = match self.inner.config.metadata_for_spec_version(spec_version) {
Some(metadata) => metadata,
None => {
let metadata: Metadata =
match get_metadata(&*self.inner.backend, block_hash).await? {
m @ RuntimeMetadata::V0(_)
| m @ RuntimeMetadata::V1(_)
| m @ RuntimeMetadata::V2(_)
| m @ RuntimeMetadata::V3(_)
| m @ RuntimeMetadata::V4(_)
| m @ RuntimeMetadata::V5(_)
| m @ RuntimeMetadata::V6(_)
| m @ RuntimeMetadata::V7(_) => {
return Err(OnlineClientAtBlockError::UnsupportedMetadataVersion {
block_hash: block_hash.into(),
version: m.version(),
});
}
RuntimeMetadata::V8(m) => {
let types = get_legacy_types(self, &m, spec_version)?;
Metadata::from_v8(&m, &types).map_err(|e| {
OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(),
metadata_version: 8,
reason: e,
}
})?
}
RuntimeMetadata::V9(m) => {
let types = get_legacy_types(self, &m, spec_version)?;
Metadata::from_v9(&m, &types).map_err(|e| {
OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(),
metadata_version: 9,
reason: e,
}
})?
}
RuntimeMetadata::V10(m) => {
let types = get_legacy_types(self, &m, spec_version)?;
Metadata::from_v10(&m, &types).map_err(|e| {
OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(),
metadata_version: 10,
reason: e,
}
})?
}
RuntimeMetadata::V11(m) => {
let types = get_legacy_types(self, &m, spec_version)?;
Metadata::from_v11(&m, &types).map_err(|e| {
OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(),
metadata_version: 11,
reason: e,
}
})?
}
RuntimeMetadata::V12(m) => {
let types = get_legacy_types(self, &m, spec_version)?;
Metadata::from_v12(&m, &types).map_err(|e| {
OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(),
metadata_version: 12,
reason: e,
}
})?
}
RuntimeMetadata::V13(m) => {
let types = get_legacy_types(self, &m, spec_version)?;
Metadata::from_v13(&m, &types).map_err(|e| {
OnlineClientAtBlockError::CannotConvertLegacyMetadata {
block_hash: block_hash.into(),
metadata_version: 13,
reason: e,
}
})?
}
RuntimeMetadata::V14(m) => Metadata::from_v14(m).map_err(|e| {
OnlineClientAtBlockError::CannotConvertModernMetadata {
block_hash: block_hash.into(),
metadata_version: 14,
reason: e,
}
})?,
RuntimeMetadata::V15(m) => Metadata::from_v15(m).map_err(|e| {
OnlineClientAtBlockError::CannotConvertModernMetadata {
block_hash: block_hash.into(),
metadata_version: 15,
reason: e,
}
})?,
RuntimeMetadata::V16(m) => Metadata::from_v16(m).map_err(|e| {
OnlineClientAtBlockError::CannotConvertModernMetadata {
block_hash: block_hash.into(),
metadata_version: 16,
reason: e,
}
})?,
};
let metadata = Arc::new(metadata);
self.inner
.config
.set_metadata_for_spec_version(spec_version, metadata.clone());
metadata
}
};
let online_client_at_block = OnlineClientAtBlockImpl {
client: self.clone(),
hasher: <T::Hasher as Hasher>::new(&metadata),
metadata,
block_ref,
block_number,
spec_version,
transaction_version,
};
Ok(ClientAtBlock {
client: online_client_at_block,
marker: PhantomData,
})
}
}
#[doc(hidden)]
pub trait OnlineClientAtBlockT<T: Config>: OfflineClientAtBlockT<T> {
fn backend(&self) -> &dyn Backend<T>;
fn block_ref(&self) -> &BlockRef<HashFor<T>>;
fn client(&self) -> OnlineClient<T>;
}
#[derive(Debug, Clone)]
pub struct OnlineClientAtBlockImpl<T: Config> {
client: OnlineClient<T>,
metadata: ArcMetadata,
hasher: T::Hasher,
block_ref: BlockRef<HashFor<T>>,
block_number: u64,
spec_version: u32,
transaction_version: u32,
}
impl<T: Config> OnlineClientAtBlockT<T> for OnlineClientAtBlockImpl<T> {
fn backend(&self) -> &dyn Backend<T> {
&*self.client.inner.backend
}
fn block_ref(&self) -> &BlockRef<HashFor<T>> {
&self.block_ref
}
fn client(&self) -> OnlineClient<T> {
self.client.clone()
}
}
impl<T: Config> OfflineClientAtBlockT<T> for OnlineClientAtBlockImpl<T> {
fn metadata_ref(&self) -> &Metadata {
&self.metadata
}
fn metadata(&self) -> ArcMetadata {
self.metadata.clone()
}
fn block_number(&self) -> u64 {
self.block_number
}
fn genesis_hash(&self) -> Option<HashFor<T>> {
Some(self.client.inner.genesis_hash)
}
fn spec_version(&self) -> u32 {
self.spec_version
}
fn transaction_version(&self) -> u32 {
self.transaction_version
}
fn hasher(&self) -> &T::Hasher {
&self.hasher
}
}
fn get_legacy_types<'a, T: Config, Md: ToTypeRegistry>(
client: &'a OnlineClient<T>,
metadata: &Md,
spec_version: u32,
) -> Result<TypeRegistrySet<'a>, OnlineClientAtBlockError> {
let mut types = client
.inner
.config
.legacy_types_for_spec_version(spec_version)
.ok_or(OnlineClientAtBlockError::MissingLegacyTypes)?;
let additional_types = frame_decode::helpers::type_registry_from_metadata(metadata)
.map_err(|e| OnlineClientAtBlockError::CannotInjectMetadataTypes { parse_error: e })?;
types.prepend(additional_types);
Ok(types)
}
async fn get_metadata<T: Config>(
backend: &dyn Backend<T>,
block_hash: HashFor<T>,
) -> Result<RuntimeMetadata, OnlineClientAtBlockError> {
let version_to_get = backend
.call("Metadata_metadata_versions", None, block_hash)
.await
.ok()
.and_then(|res| <Vec<u32>>::decode(&mut &res[..]).ok())
.and_then(|versions| {
versions.into_iter().filter(|v| *v != u32::MAX).max()
});
if let Some(version_to_get) = version_to_get {
let version_bytes = version_to_get.encode();
let rpc_response = backend
.call(
"Metadata_metadata_at_version",
Some(&version_bytes),
block_hash,
)
.await
.map_err(|e| OnlineClientAtBlockError::CannotGetMetadata {
block_hash: block_hash.into(),
reason: format!("Error calling Metadata_metadata_at_version: {e}"),
})?;
let (_, metadata) = <Option<(Compact<u32>, RuntimeMetadataPrefixed)>>::decode(&mut &rpc_response[..])
.map_err(|e| OnlineClientAtBlockError::CannotGetMetadata {
block_hash: block_hash.into(),
reason: format!("Error decoding response for Metadata_metadata_at_version: {e}"),
})?
.ok_or_else(|| OnlineClientAtBlockError::CannotGetMetadata {
block_hash: block_hash.into(),
reason: format!("No metadata returned for the latest version from Metadata_metadata_versions ({version_to_get})"),
})?;
return Ok(metadata.1);
}
let metadata_bytes = backend
.call("Metadata_metadata", None, block_hash)
.await
.map_err(|e| OnlineClientAtBlockError::CannotGetMetadata {
block_hash: block_hash.into(),
reason: format!("Error calling Metadata_metadata: {e}"),
})?;
let (_, metadata) = <(Compact<u32>, RuntimeMetadataPrefixed)>::decode(&mut &metadata_bytes[..])
.map_err(|e| OnlineClientAtBlockError::CannotGetMetadata {
block_hash: block_hash.into(),
reason: format!("Error decoding response for Metadata_metadata: {e}"),
})?;
Ok(metadata.1)
}