pub mod chain_head;
pub mod legacy;
pub mod utils;
use crate::{
config::{Config, HashFor},
error::BackendError,
};
use async_trait::async_trait;
use codec::{Decode, Encode};
use futures::{Stream, StreamExt};
use pezkuwi_subxt_core::client::RuntimeVersion;
use pezkuwi_subxt_metadata::Metadata;
use std::{pin::Pin, sync::Arc};
pub mod rpc {
pub use pezkuwi_subxt_rpcs::{
client::{RawRpcFuture, RawRpcSubscription, RawValue, RpcParams},
rpc_params, RpcClient, RpcClientT,
};
crate::macros::cfg_reconnecting_rpc_client! {
pub use pezkuwi_subxt_rpcs::client::reconnecting_rpc_client;
}
}
#[doc(hidden)]
pub(crate) mod sealed {
pub trait Sealed {}
}
#[async_trait]
pub trait Backend<T: Config>: sealed::Sealed + Send + Sync + 'static {
async fn storage_fetch_values(
&self,
keys: Vec<Vec<u8>>,
at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError>;
async fn storage_fetch_descendant_keys(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<StreamOfResults<Vec<u8>>, BackendError>;
async fn storage_fetch_descendant_values(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError>;
async fn genesis_hash(&self) -> Result<HashFor<T>, BackendError>;
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError>;
async fn block_body(&self, at: HashFor<T>) -> Result<Option<Vec<Vec<u8>>>, BackendError>;
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<HashFor<T>>, BackendError>;
async fn current_runtime_version(&self) -> Result<RuntimeVersion, BackendError>;
async fn stream_runtime_version(&self)
-> Result<StreamOfResults<RuntimeVersion>, BackendError>;
async fn stream_all_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError>;
async fn stream_best_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError>;
async fn stream_finalized_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError>;
async fn submit_transaction(
&self,
bytes: &[u8],
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, BackendError>;
async fn call(
&self,
method: &str,
call_parameters: Option<&[u8]>,
at: HashFor<T>,
) -> Result<Vec<u8>, BackendError>;
}
#[async_trait]
pub trait BackendExt<T: Config>: Backend<T> {
async fn storage_fetch_value(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<Option<Vec<u8>>, BackendError> {
self.storage_fetch_values(vec![key], at)
.await?
.next()
.await
.transpose()
.map(|o| o.map(|s| s.value))
}
async fn call_decoding<D: codec::Decode>(
&self,
method: &str,
call_parameters: Option<&[u8]>,
at: HashFor<T>,
) -> Result<D, BackendError> {
let bytes = self.call(method, call_parameters, at).await?;
let res =
D::decode(&mut &*bytes).map_err(BackendError::CouldNotScaleDecodeRuntimeResponse)?;
Ok(res)
}
async fn metadata_at_version(
&self,
version: u32,
at: HashFor<T>,
) -> Result<Metadata, BackendError> {
let param = version.encode();
let opaque: Option<frame_metadata::OpaqueMetadata> =
self.call_decoding("Metadata_metadata_at_version", Some(¶m), at).await?;
let Some(opaque) = opaque else {
return Err(BackendError::MetadataVersionNotFound(version));
};
let metadata: Metadata =
Decode::decode(&mut &opaque.0[..]).map_err(BackendError::CouldNotDecodeMetadata)?;
Ok(metadata)
}
async fn legacy_metadata(&self, at: HashFor<T>) -> Result<Metadata, BackendError> {
let opaque: frame_metadata::OpaqueMetadata =
self.call_decoding("Metadata_metadata", None, at).await?;
let metadata: Metadata =
Decode::decode(&mut &opaque.0[..]).map_err(BackendError::CouldNotDecodeMetadata)?;
Ok(metadata)
}
}
#[async_trait]
impl<B: Backend<T> + ?Sized, T: Config> BackendExt<T> for B {}
#[derive(Clone)]
pub struct BlockRef<H> {
hash: H,
_pointer: Option<Arc<dyn BlockRefT>>,
}
impl<H> From<H> for BlockRef<H> {
fn from(value: H) -> Self {
BlockRef::from_hash(value)
}
}
impl<H: PartialEq> PartialEq for BlockRef<H> {
fn eq(&self, other: &Self) -> bool {
self.hash == other.hash
}
}
impl<H: Eq> Eq for BlockRef<H> {}
impl<H: PartialOrd> PartialOrd for BlockRef<H> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.hash.partial_cmp(&other.hash)
}
}
impl<H: Ord> Ord for BlockRef<H> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.hash.cmp(&other.hash)
}
}
impl<H: std::fmt::Debug> std::fmt::Debug for BlockRef<H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("BlockRef").field(&self.hash).finish()
}
}
impl<H: std::hash::Hash> std::hash::Hash for BlockRef<H> {
fn hash<Hasher: std::hash::Hasher>(&self, state: &mut Hasher) {
self.hash.hash(state);
}
}
impl<H> BlockRef<H> {
pub fn from_hash(hash: H) -> Self {
Self { hash, _pointer: None }
}
pub fn new<P: BlockRefT>(hash: H, inner: P) -> Self {
Self { hash, _pointer: Some(Arc::new(inner)) }
}
pub fn hash(&self) -> H
where
H: Copy,
{
self.hash
}
}
pub trait BlockRefT: Send + Sync + 'static {}
pub struct StreamOf<T>(Pin<Box<dyn Stream<Item = T> + Send + 'static>>);
impl<T> Stream for StreamOf<T> {
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}
impl<T> std::fmt::Debug for StreamOf<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("StreamOf").field(&"<stream>").finish()
}
}
impl<T> StreamOf<T> {
pub fn new(inner: Pin<Box<dyn Stream<Item = T> + Send + 'static>>) -> Self {
StreamOf(inner)
}
pub async fn next(&mut self) -> Option<T> {
StreamExt::next(self).await
}
}
pub type StreamOfResults<T> = StreamOf<Result<T, BackendError>>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransactionStatus<Hash> {
Validated,
Broadcasted,
NoLongerInBestBlock,
InBestBlock {
hash: BlockRef<Hash>,
},
InFinalizedBlock {
hash: BlockRef<Hash>,
},
Error {
message: String,
},
Invalid {
message: String,
},
Dropped {
message: String,
},
}
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Debug)]
pub struct StorageResponse {
pub key: Vec<u8>,
pub value: Vec<u8>,
}
#[cfg(test)]
mod test {
use super::*;
use crate::backend::StorageResponse;
use core::convert::Infallible;
use futures::StreamExt;
use pezkuwi_subxt_core::{config::DefaultExtrinsicParams, Config};
use pezkuwi_subxt_rpcs::client::{
mock_rpc_client::{Json, MockRpcClientBuilder},
MockRpcClient,
};
use primitive_types::H256;
use rpc::RpcClientT;
use std::collections::{HashMap, VecDeque};
fn random_hash() -> H256 {
H256::random()
}
fn disconnected_will_reconnect() -> pezkuwi_subxt_rpcs::Error {
pezkuwi_subxt_rpcs::Error::DisconnectedWillReconnect("..".into())
}
fn storage_response<K: Into<Vec<u8>>, V: Into<Vec<u8>>>(key: K, value: V) -> StorageResponse
where
Vec<u8>: From<K>,
{
StorageResponse { key: key.into(), value: value.into() }
}
enum Conf {}
impl Config for Conf {
type AccountId = crate::utils::AccountId32;
type Address = crate::utils::MultiAddress<Self::AccountId, ()>;
type Signature = crate::utils::MultiSignature;
type Hasher = crate::config::bizinikiwi::BlakeTwo256;
type Header = crate::config::bizinikiwi::BizinikiwiHeader<u32, Self::Hasher>;
type ExtrinsicParams = DefaultExtrinsicParams<Self>;
type AssetId = u32;
}
mod legacy {
use super::*;
use crate::{
backend::legacy::{rpc_methods::RuntimeVersion, LegacyBackend},
error::RpcError,
};
use crate::backend::Backend;
fn client_runtime_version(num: u32) -> crate::client::RuntimeVersion {
crate::client::RuntimeVersion { spec_version: num, transaction_version: num }
}
fn runtime_version(num: u32) -> RuntimeVersion {
RuntimeVersion { spec_version: num, transaction_version: num, other: HashMap::new() }
}
#[tokio::test]
async fn storage_fetch_values() {
let mut values: HashMap<&str, VecDeque<_>> = HashMap::from_iter([
(
"ID1",
VecDeque::from_iter([
Err(disconnected_will_reconnect()),
Ok(Json(hex::encode("Data1"))),
]),
),
(
"ID2",
VecDeque::from_iter([
Err(disconnected_will_reconnect()),
Ok(Json(hex::encode("Data2"))),
]),
),
("ID3", VecDeque::from_iter([Ok(Json(hex::encode("Data3")))])),
]);
let rpc_client = MockRpcClient::builder()
.method_handler("state_getStorage", move |params| {
let params = params.map(|p| p.get().to_string());
let rpc_params = jsonrpsee::types::Params::new(params.as_deref());
let key: pezsp_core::Bytes = rpc_params.sequence().next().unwrap();
let key = std::str::from_utf8(&key.0).unwrap();
let values = values.get_mut(key).unwrap();
let value = values.pop_front().unwrap();
async move { value }
})
.build();
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);
let response = backend
.storage_fetch_values(
["ID1".into(), "ID2".into(), "ID3".into()].into(),
random_hash(),
)
.await
.unwrap();
let response = response.map(|x| x.unwrap()).collect::<Vec<StorageResponse>>().await;
let expected = vec![
storage_response("ID1", "Data1"),
storage_response("ID2", "Data2"),
storage_response("ID3", "Data3"),
];
assert_eq!(expected, response)
}
#[tokio::test]
async fn storage_fetch_value() {
let rpc_client = MockRpcClient::builder()
.method_handler_once("state_getStorage", async move |_params| {
Err::<Infallible, _>(disconnected_will_reconnect())
})
.method_handler_once("state_getStorage", async move |_param| {
Json(hex::encode("Data1"))
})
.build();
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);
let response = backend.storage_fetch_value("ID1".into(), random_hash()).await.unwrap();
let response = response.unwrap();
assert_eq!("Data1".to_owned(), String::from_utf8(response).unwrap())
}
#[tokio::test]
async fn simple_fetch() {
let hash = random_hash();
let rpc_client = MockRpcClient::builder()
.method_handler_once("chain_getBlockHash", async move |_params| {
Err::<Infallible, _>(disconnected_will_reconnect())
})
.method_handler_once("chain_getBlockHash", async move |_params| {
Json(hash)
})
.build();
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);
let response = backend.genesis_hash().await.unwrap();
assert_eq!(hash, response)
}
#[tokio::test]
async fn stream_simple() {
let mut data = VecDeque::from_iter([
vec![
Ok(Json(runtime_version(0))),
Err(disconnected_will_reconnect()),
Ok(Json(runtime_version(1))),
],
vec![
Err(disconnected_will_reconnect()),
Ok(Json(runtime_version(2))),
Ok(Json(runtime_version(3))),
],
vec![
Ok(Json(runtime_version(4))),
Ok(Json(runtime_version(5))),
Err(pezkuwi_subxt_rpcs::Error::Client("..".into())),
],
]);
let rpc_client = MockRpcClient::builder()
.subscription_handler("state_subscribeRuntimeVersion", move |_params, _unsub| {
let res = data.pop_front().unwrap();
async move { res }
})
.build();
let backend: LegacyBackend<Conf> = LegacyBackend::builder().build(rpc_client);
let mut results = backend.stream_runtime_version().await.unwrap();
assert_eq!(results.next().await.unwrap().unwrap(), client_runtime_version(0));
assert_eq!(results.next().await.unwrap().unwrap(), client_runtime_version(4));
assert_eq!(results.next().await.unwrap().unwrap(), client_runtime_version(5));
assert!(matches!(
results.next().await.unwrap(),
Err(BackendError::Rpc(RpcError::ClientError(pezkuwi_subxt_rpcs::Error::Client(_))))
));
assert!(results.next().await.is_none());
}
}
mod unstable_backend {
use pezkuwi_subxt_rpcs::methods::chain_head::{
self, Bytes, Initialized, MethodResponse, MethodResponseStarted, OperationError,
OperationId, OperationStorageItems, RuntimeSpec, RuntimeVersionEvent,
};
use tokio::select;
use super::{chain_head::*, *};
fn build_backend(
rpc_client: impl RpcClientT,
) -> (ChainHeadBackend<Conf>, ChainHeadBackendDriver<Conf>) {
let (backend, driver): (ChainHeadBackend<Conf>, _) =
ChainHeadBackend::builder().build(rpc_client);
(backend, driver)
}
fn build_backend_spawn_background(rpc_client: impl RpcClientT) -> ChainHeadBackend<Conf> {
ChainHeadBackend::builder().build_with_background_driver(rpc_client)
}
fn runtime_spec() -> RuntimeSpec {
let spec = serde_json::json!({
"specName": "zagros",
"implName": "parity-zagros",
"specVersion": 9122,
"implVersion": 0,
"transactionVersion": 7,
"apis": {
"0xdf6acb689907609b": 3,
"0x37e397fc7c91f5e4": 1,
"0x40fe3ad401f8959a": 5,
"0xd2bc9897eed08f15": 3,
"0xf78b278be53f454c": 2,
"0xaf2c0297a23e6d3d": 1,
"0x49eaaf1b548a0cb0": 1,
"0x91d5df18b0d2cf58": 1,
"0xed99c5acb25eedf5": 3,
"0xcbca25e39f142387": 2,
"0x687ad44ad37f03c2": 1,
"0xab3c0572291feb8b": 1,
"0xbc9d89904f5b923f": 1,
"0x37c8bb1350a9a2a8": 1
}
});
serde_json::from_value(spec).expect("Mock runtime spec should be the right shape")
}
type FollowEvent = chain_head::FollowEvent<HashFor<Conf>>;
fn mock_client_builder(
recv: tokio::sync::mpsc::UnboundedReceiver<FollowEvent>,
) -> MockRpcClientBuilder {
mock_client_builder_with_ids(recv, 0..)
}
fn mock_client_builder_with_ids<I>(
recv: tokio::sync::mpsc::UnboundedReceiver<FollowEvent>,
ids: I,
) -> MockRpcClientBuilder
where
I: IntoIterator<Item = usize> + Send,
I::IntoIter: Send + Sync + 'static,
{
use pezkuwi_subxt_rpcs::{client::mock_rpc_client::AndThen, Error, UserError};
let recv = Arc::new(tokio::sync::Mutex::new(recv));
let mut ids = ids.into_iter();
MockRpcClient::builder().subscription_handler(
"chainHead_v1_follow",
move |_params, _unsub| {
let recv = recv.clone();
let id = ids.next();
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn(async move {
let mut recv_guard = recv.lock().await;
loop {
select! {
_ = tx.closed() => {
break
},
Some(msg) = recv_guard.recv() => {
if tx.send(Json(msg)).is_err() {
break
}
}
}
}
});
async move {
if let Some(id) = id {
let follow_event =
FollowEvent::Initialized(Initialized::<HashFor<Conf>> {
finalized_block_hashes: vec![random_hash()],
finalized_block_runtime: Some(chain_head::RuntimeEvent::Valid(
RuntimeVersionEvent { spec: runtime_spec() },
)),
});
let res = AndThen(
(vec![Json(follow_event)], subscription_id(id)),
rx,
);
Ok(res)
} else {
Err(Error::User(UserError::method_not_found()))
}
}
},
)
}
fn subscription_id(id: usize) -> String {
format!("chainHeadFollowSubscriptionId{id}")
}
fn response_started(id: &str) -> MethodResponse {
MethodResponse::Started(MethodResponseStarted {
operation_id: id.to_owned(),
discarded_items: None,
})
}
fn operation_error(id: &str) -> FollowEvent {
FollowEvent::OperationError(OperationError {
operation_id: id.to_owned(),
error: "error".to_owned(),
})
}
fn limit_reached() -> MethodResponse {
MethodResponse::LimitReached
}
fn storage_done(id: &str) -> FollowEvent {
FollowEvent::OperationStorageDone(OperationId { operation_id: id.to_owned() })
}
fn storage_result(key: &str, value: &str) -> chain_head::StorageResult {
chain_head::StorageResult {
key: Bytes(key.to_owned().into()),
result: chain_head::StorageResultType::Value(Bytes(value.to_owned().into())),
}
}
fn storage_items(id: &str, items: &[chain_head::StorageResult]) -> FollowEvent {
FollowEvent::OperationStorageItems(OperationStorageItems {
operation_id: id.to_owned(),
items: VecDeque::from(items.to_owned()),
})
}
fn operation_continue(id: &str) -> FollowEvent {
FollowEvent::OperationWaitingForContinue(OperationId { operation_id: id.to_owned() })
}
fn follow_event_stop() -> FollowEvent {
FollowEvent::Stop
}
#[tokio::test]
async fn storage_fetch_values_returns_stream_with_single_error() {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let rpc_client = mock_client_builder(rx)
.method_handler_once("chainHead_v1_storage", move |_params| {
tokio::spawn(async move {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
tx.send(operation_error("Id1")).unwrap();
});
async move { Json(response_started("Id1")) }
})
.build();
let backend = build_backend_spawn_background(rpc_client);
let mut response = backend
.storage_fetch_values(
["ID1".into(), "ID2".into(), "ID3".into()].into(),
random_hash(),
)
.await
.unwrap();
assert!(response
.next()
.await
.unwrap()
.is_err_and(|e| matches!(e, BackendError::Other(e) if e == "error")));
assert!(response.next().await.is_none());
}
#[tokio::test]
async fn storage_fetch_values_retry_query() {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let rpc_client = mock_client_builder(rx)
.method_handler_once("chainHead_v1_storage", async move |_params| {
Err::<Infallible, _>(disconnected_will_reconnect())
})
.method_handler_once("chainHead_v1_storage", async move |_params| {
tokio::spawn(async move {
tx.send(storage_items(
"Id1",
&[
storage_result("ID1", "Data1"),
storage_result("ID2", "Data2"),
storage_result("ID3", "Data3"),
],
))
.unwrap();
tx.send(storage_done("Id1")).unwrap();
});
Ok(Json(response_started("Id1")))
})
.build();
let backend = build_backend_spawn_background(rpc_client);
let response = backend
.storage_fetch_values(
["ID1".into(), "ID2".into(), "ID3".into()].into(),
random_hash(),
)
.await
.unwrap();
let response = response.map(|x| x.unwrap()).collect::<Vec<StorageResponse>>().await;
assert_eq!(
vec![
storage_response("ID1", "Data1"),
storage_response("ID2", "Data2"),
storage_response("ID3", "Data3"),
],
response
)
}
#[tokio::test]
async fn storage_fetch_values_retry_chainhead_continue() {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let tx2 = tx.clone();
let rpc_client = mock_client_builder(rx)
.method_handler_once("chainHead_v1_storage", async move |_params| {
Err::<Infallible, _>(disconnected_will_reconnect())
})
.method_handler_once("chainHead_v1_storage", async move |_params| {
tokio::spawn(async move {
tx.send(storage_items("Id1", &[storage_result("ID1", "Data1")])).unwrap();
tx.send(operation_continue("Id1")).unwrap();
});
Ok(Json(response_started("Id1")))
})
.method_handler_once("chainHead_v1_continue", async move |_params| {
Err::<Infallible, _>(disconnected_will_reconnect())
})
.method_handler_once("chainHead_v1_continue", async move |_params| {
tokio::spawn(async move {
tx2.send(storage_items("Id1", &[storage_result("ID2", "Data2")])).unwrap();
tx2.send(storage_items("Id1", &[storage_result("ID3", "Data3")])).unwrap();
tx2.send(storage_done("Id1")).unwrap();
});
Ok(Json(()))
})
.build();
let backend = build_backend_spawn_background(rpc_client);
let response = backend
.storage_fetch_values(
["ID1".into(), "ID2".into(), "ID3".into()].into(),
random_hash(),
)
.await
.unwrap();
let response = response.map(|x| x.unwrap()).collect::<Vec<StorageResponse>>().await;
assert_eq!(
vec![
storage_response("ID1", "Data1"),
storage_response("ID2", "Data2"),
storage_response("ID3", "Data3"),
],
response
)
}
#[tokio::test]
async fn simple_fetch() {
let hash = random_hash();
let (_tx, rx) = tokio::sync::mpsc::unbounded_channel();
let rpc_client = mock_client_builder(rx)
.method_handler_once("chainSpec_v1_genesisHash", async move |_params| {
Err::<Infallible, _>(disconnected_will_reconnect())
})
.method_handler_once("chainSpec_v1_genesisHash", async move |_params| {
Ok(Json(hash))
})
.build();
let backend = build_backend_spawn_background(rpc_client);
let response_hash = backend.genesis_hash().await.unwrap();
assert_eq!(hash, response_hash)
}
#[tokio::test]
async fn stale_subscription_id_failure() {
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
let rpc_client = mock_client_builder_with_ids(rx, [1, 2])
.method_handler("chainHead_v1_storage", move |params| {
let this_sub_id = {
let params = params.as_ref().map(|p| p.get());
let rpc_params = jsonrpsee::types::Params::new(params);
rpc_params.sequence().next::<String>().unwrap()
};
let is_wrong_sub_id = this_sub_id == subscription_id(1);
async move {
if is_wrong_sub_id {
Json(limit_reached())
} else {
Json(response_started("some_id"))
}
}
})
.build();
let (backend, mut driver): (ChainHeadBackend<Conf>, _) = build_backend(rpc_client);
tx.send(follow_event_stop()).unwrap();
let _ = driver.next().await.unwrap();
let response = backend.storage_fetch_values(["ID1".into()].into(), random_hash()).await;
assert!(matches!(response, Err(e) if e.is_rpc_limit_reached()));
let _ = driver.next().await.unwrap();
let _ = driver.next().await.unwrap();
let _ = driver.next().await.unwrap();
let response = backend.storage_fetch_values(["ID1".into()].into(), random_hash()).await;
assert!(response.is_ok());
}
}
}