use super::{OfflineClient, OfflineClientT};
use crate::{
backend::{legacy::LegacyBackend, rpc::RpcClient, Backend, BackendExt, StreamOfResults},
blocks::{BlockRef, BlocksClient},
config::{Config, HashFor},
constants::ConstantsClient,
custom_values::CustomValuesClient,
error::{BackendError, OnlineClientError, RuntimeUpdateeApplyError, RuntimeUpdaterError},
events::EventsClient,
runtime_api::RuntimeApiClient,
storage::StorageClient,
tx::TxClient,
view_functions::ViewFunctionsClient,
Metadata,
};
use derive_where::derive_where;
use futures::{future, TryFutureExt};
use pezkuwi_subxt_core::client::{ClientState, RuntimeVersion};
use std::sync::{Arc, RwLock};
pub trait OnlineClientT<T: Config>: OfflineClientT<T> {
fn backend(&self) -> &dyn Backend<T>;
}
#[derive_where(Clone)]
pub struct OnlineClient<T: Config> {
inner: Arc<RwLock<Inner<T>>>,
backend: Arc<dyn Backend<T>>,
}
#[derive_where(Debug)]
struct Inner<T: Config> {
genesis_hash: HashFor<T>,
runtime_version: RuntimeVersion,
metadata: Metadata,
hasher: T::Hasher,
}
impl<T: Config> std::fmt::Debug for OnlineClient<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Client")
.field("rpc", &"RpcClient")
.field("inner", &self.inner)
.finish()
}
}
#[cfg(feature = "jsonrpsee")]
#[cfg_attr(docsrs, doc(cfg(feature = "jsonrpsee")))]
impl<T: Config> OnlineClient<T> {
pub async fn new() -> Result<OnlineClient<T>, OnlineClientError> {
let url = "ws://127.0.0.1:9944";
OnlineClient::from_url(url).await
}
pub async fn from_url(url: impl AsRef<str>) -> Result<OnlineClient<T>, OnlineClientError> {
pezkuwi_subxt_rpcs::utils::validate_url_is_secure(url.as_ref())?;
OnlineClient::from_insecure_url(url).await
}
pub async fn from_insecure_url(
url: impl AsRef<str>,
) -> Result<OnlineClient<T>, OnlineClientError> {
let client = RpcClient::from_insecure_url(url).await?;
let backend = LegacyBackend::builder().build(client);
OnlineClient::from_backend(Arc::new(backend)).await
}
}
impl<T: Config> OnlineClient<T> {
pub async fn from_rpc_client(
rpc_client: impl Into<RpcClient>,
) -> Result<OnlineClient<T>, OnlineClientError> {
let rpc_client = rpc_client.into();
let backend = Arc::new(LegacyBackend::builder().build(rpc_client));
OnlineClient::from_backend(backend).await
}
pub fn from_rpc_client_with(
genesis_hash: HashFor<T>,
runtime_version: RuntimeVersion,
metadata: impl Into<Metadata>,
rpc_client: impl Into<RpcClient>,
) -> Result<OnlineClient<T>, OnlineClientError> {
let rpc_client = rpc_client.into();
let backend = Arc::new(LegacyBackend::builder().build(rpc_client));
OnlineClient::from_backend_with(genesis_hash, runtime_version, metadata, backend)
}
pub async fn from_backend<B: Backend<T>>(
backend: Arc<B>,
) -> Result<OnlineClient<T>, OnlineClientError> {
let latest_block = backend
.latest_finalized_block_ref()
.await
.map_err(OnlineClientError::CannotGetLatestFinalizedBlock)?;
let (genesis_hash, runtime_version, metadata) = future::join3(
backend.genesis_hash().map_err(OnlineClientError::CannotGetGenesisHash),
backend
.current_runtime_version()
.map_err(OnlineClientError::CannotGetCurrentRuntimeVersion),
OnlineClient::fetch_metadata(&*backend, latest_block.hash())
.map_err(OnlineClientError::CannotFetchMetadata),
)
.await;
OnlineClient::from_backend_with(genesis_hash?, runtime_version?, metadata?, backend)
}
pub fn from_backend_with<B: Backend<T>>(
genesis_hash: HashFor<T>,
runtime_version: RuntimeVersion,
metadata: impl Into<Metadata>,
backend: Arc<B>,
) -> Result<OnlineClient<T>, OnlineClientError> {
use pezkuwi_subxt_core::config::Hasher;
let metadata = metadata.into();
let hasher = T::Hasher::new(&metadata);
Ok(OnlineClient {
inner: Arc::new(RwLock::new(Inner { genesis_hash, runtime_version, metadata, hasher })),
backend,
})
}
async fn fetch_metadata(
backend: &dyn Backend<T>,
block_hash: HashFor<T>,
) -> Result<Metadata, BackendError> {
#[cfg(feature = "unstable-metadata")]
{
const UNSTABLE_METADATA_VERSION: u32 = u32::MAX;
match backend.metadata_at_version(UNSTABLE_METADATA_VERSION, block_hash).await {
Ok(bytes) => Ok(bytes),
Err(_) => OnlineClient::fetch_latest_stable_metadata(backend, block_hash).await,
}
}
#[cfg(not(feature = "unstable-metadata"))]
OnlineClient::fetch_latest_stable_metadata(backend, block_hash).await
}
async fn fetch_latest_stable_metadata(
backend: &dyn Backend<T>,
block_hash: HashFor<T>,
) -> Result<Metadata, BackendError> {
use pezkuwi_subxt_metadata::SUPPORTED_METADATA_VERSIONS;
for version in SUPPORTED_METADATA_VERSIONS {
if let Ok(bytes) = backend.metadata_at_version(version, block_hash).await {
return Ok(bytes);
}
}
backend.legacy_metadata(block_hash).await
}
pub fn updater(&self) -> ClientRuntimeUpdater<T> {
ClientRuntimeUpdater(self.clone())
}
pub fn hasher(&self) -> T::Hasher {
self.inner.read().expect("shouldn't be poisoned").hasher
}
pub fn metadata(&self) -> Metadata {
let inner = self.inner.read().expect("shouldn't be poisoned");
inner.metadata.clone()
}
pub fn set_metadata(&self, metadata: impl Into<Metadata>) {
let mut inner = self.inner.write().expect("shouldn't be poisoned");
inner.metadata = metadata.into();
}
pub fn genesis_hash(&self) -> HashFor<T> {
let inner = self.inner.read().expect("shouldn't be poisoned");
inner.genesis_hash
}
pub fn set_genesis_hash(&self, genesis_hash: HashFor<T>) {
let mut inner = self.inner.write().expect("shouldn't be poisoned");
inner.genesis_hash = genesis_hash;
}
pub fn runtime_version(&self) -> RuntimeVersion {
let inner = self.inner.read().expect("shouldn't be poisoned");
inner.runtime_version
}
pub fn set_runtime_version(&self, runtime_version: RuntimeVersion) {
let mut inner = self.inner.write().expect("shouldn't be poisoned");
inner.runtime_version = runtime_version;
}
pub fn backend(&self) -> &dyn Backend<T> {
&*self.backend
}
pub fn offline(&self) -> OfflineClient<T> {
let inner = self.inner.read().expect("shouldn't be poisoned");
OfflineClient::new(inner.genesis_hash, inner.runtime_version, inner.metadata.clone())
}
pub fn tx(&self) -> TxClient<T, Self> {
<Self as OfflineClientT<T>>::tx(self)
}
pub fn events(&self) -> EventsClient<T, Self> {
<Self as OfflineClientT<T>>::events(self)
}
pub fn storage(&self) -> StorageClient<T, Self> {
<Self as OfflineClientT<T>>::storage(self)
}
pub fn constants(&self) -> ConstantsClient<T, Self> {
<Self as OfflineClientT<T>>::constants(self)
}
pub fn blocks(&self) -> BlocksClient<T, Self> {
<Self as OfflineClientT<T>>::blocks(self)
}
pub fn runtime_api(&self) -> RuntimeApiClient<T, Self> {
<Self as OfflineClientT<T>>::runtime_api(self)
}
pub fn view_functions(&self) -> ViewFunctionsClient<T, Self> {
<Self as OfflineClientT<T>>::view_functions(self)
}
pub fn custom_values(&self) -> CustomValuesClient<T, Self> {
<Self as OfflineClientT<T>>::custom_values(self)
}
}
impl<T: Config> OfflineClientT<T> for OnlineClient<T> {
fn metadata(&self) -> Metadata {
self.metadata()
}
fn genesis_hash(&self) -> HashFor<T> {
self.genesis_hash()
}
fn runtime_version(&self) -> RuntimeVersion {
self.runtime_version()
}
fn hasher(&self) -> T::Hasher {
self.hasher()
}
fn client_state(&self) -> ClientState<T> {
let inner = self.inner.read().expect("shouldn't be poisoned");
ClientState {
genesis_hash: inner.genesis_hash,
runtime_version: inner.runtime_version,
metadata: inner.metadata.clone(),
}
}
}
impl<T: Config> OnlineClientT<T> for OnlineClient<T> {
fn backend(&self) -> &dyn Backend<T> {
&*self.backend
}
}
pub struct ClientRuntimeUpdater<T: Config>(OnlineClient<T>);
impl<T: Config> ClientRuntimeUpdater<T> {
fn is_runtime_version_different(&self, new: &RuntimeVersion) -> bool {
let curr = self.0.inner.read().expect("shouldn't be poisoned");
&curr.runtime_version != new
}
fn do_update(&self, update: Update) {
let mut writable = self.0.inner.write().expect("shouldn't be poisoned");
writable.metadata = update.metadata;
writable.runtime_version = update.runtime_version;
}
pub fn apply_update(&self, update: Update) -> Result<(), RuntimeUpdateeApplyError> {
if !self.is_runtime_version_different(&update.runtime_version) {
return Err(RuntimeUpdateeApplyError::SameVersion);
}
self.do_update(update);
Ok(())
}
pub async fn perform_runtime_updates(&self) -> Result<(), RuntimeUpdaterError> {
let mut runtime_version_stream = self.runtime_updates().await?;
loop {
let update = runtime_version_stream.next().await?;
let _ = self.apply_update(update);
}
}
pub async fn runtime_updates(&self) -> Result<RuntimeUpdaterStream<T>, RuntimeUpdaterError> {
let stream = self
.0
.backend()
.stream_runtime_version()
.await
.map_err(RuntimeUpdaterError::CannotStreamRuntimeVersion)?;
Ok(RuntimeUpdaterStream { stream, client: self.0.clone() })
}
}
pub struct RuntimeUpdaterStream<T: Config> {
stream: StreamOfResults<RuntimeVersion>,
client: OnlineClient<T>,
}
impl<T: Config> RuntimeUpdaterStream<T> {
pub async fn next(&mut self) -> Result<Update, RuntimeUpdaterError> {
let runtime_version = self
.stream
.next()
.await
.ok_or(RuntimeUpdaterError::UnexpectedEndOfUpdateStream)?
.map_err(RuntimeUpdaterError::CannotGetNextRuntimeVersion)?;
let at = wait_runtime_upgrade_in_finalized_block(&self.client, &runtime_version).await?;
let metadata = OnlineClient::fetch_metadata(self.client.backend(), at.hash())
.await
.map_err(RuntimeUpdaterError::CannotFetchNewMetadata)?;
Ok(Update { metadata, runtime_version })
}
}
pub struct Update {
runtime_version: RuntimeVersion,
metadata: Metadata,
}
impl Update {
pub fn runtime_version(&self) -> &RuntimeVersion {
&self.runtime_version
}
pub fn metadata(&self) -> &Metadata {
&self.metadata
}
}
async fn wait_runtime_upgrade_in_finalized_block<T: Config>(
client: &OnlineClient<T>,
runtime_version: &RuntimeVersion,
) -> Result<BlockRef<HashFor<T>>, RuntimeUpdaterError> {
let hasher = client.inner.read().expect("Lock shouldn't be poisoned").hasher;
let mut block_sub = client
.backend()
.stream_finalized_block_headers(hasher)
.await
.map_err(RuntimeUpdaterError::CannotStreamFinalizedBlocks)?;
let block_ref = loop {
let (_, block_ref) = block_sub
.next()
.await
.ok_or(RuntimeUpdaterError::UnexpectedEndOfBlockStream)?
.map_err(RuntimeUpdaterError::CannotGetNextFinalizedBlock)?;
let addr =
crate::dynamic::storage::<(), scale_value::Value>("System", "LastRuntimeUpgrade");
let client_at = client.storage().at(block_ref.hash());
let value = client_at
.entry(addr)
.map_err(|_| RuntimeUpdaterError::CantFindSystemLastRuntimeUpgrade)?
.fetch(())
.await
.map_err(RuntimeUpdaterError::CantFetchLastRuntimeUpgrade)?
.decode_as::<LastRuntimeUpgrade>()
.map_err(RuntimeUpdaterError::CannotDecodeLastRuntimeUpgrade)?;
#[derive(scale_decode::DecodeAsType)]
struct LastRuntimeUpgrade {
spec_version: u32,
}
if value.spec_version == runtime_version.spec_version {
break block_ref;
}
};
Ok(block_ref)
}