use self::rpc_methods::TransactionStatus as RpcTransactionStatus;
use crate::{
backend::{
utils::{retry, retry_stream},
Backend, BlockRef, RuntimeVersion, StorageResponse, StreamOf, StreamOfResults,
TransactionStatus,
},
config::{Config, HashFor, Header},
error::BackendError,
};
use async_trait::async_trait;
use futures::{future, future::Either, stream, Future, FutureExt, Stream, StreamExt, TryStreamExt};
use pezkuwi_subxt_rpcs::RpcClient;
use std::{
collections::VecDeque,
pin::Pin,
task::{Context, Poll},
};
pub mod rpc_methods {
pub use pezkuwi_subxt_rpcs::methods::legacy::*;
}
pub use rpc_methods::LegacyRpcMethods;
pub struct LegacyBackendBuilder<T> {
storage_page_size: u32,
_marker: std::marker::PhantomData<T>,
}
impl<T: Config> Default for LegacyBackendBuilder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Config> LegacyBackendBuilder<T> {
pub fn new() -> Self {
Self { storage_page_size: 64, _marker: std::marker::PhantomData }
}
pub fn storage_page_size(mut self, storage_page_size: u32) -> Self {
self.storage_page_size = storage_page_size;
self
}
pub fn build(self, client: impl Into<RpcClient>) -> LegacyBackend<T> {
LegacyBackend {
storage_page_size: self.storage_page_size,
methods: LegacyRpcMethods::new(client.into()),
}
}
}
#[derive(Debug)]
pub struct LegacyBackend<T> {
storage_page_size: u32,
methods: LegacyRpcMethods<T>,
}
impl<T> Clone for LegacyBackend<T> {
fn clone(&self) -> LegacyBackend<T> {
LegacyBackend { storage_page_size: self.storage_page_size, methods: self.methods.clone() }
}
}
impl<T: Config> LegacyBackend<T> {
pub fn builder() -> LegacyBackendBuilder<T> {
LegacyBackendBuilder::new()
}
}
impl<T: Config> super::sealed::Sealed for LegacyBackend<T> {}
#[async_trait]
impl<T: Config + Send + Sync + 'static> Backend<T> for LegacyBackend<T> {
async fn storage_fetch_values(
&self,
keys: Vec<Vec<u8>>,
at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError> {
fn get_entry<T: Config>(
key: Vec<u8>,
at: HashFor<T>,
methods: LegacyRpcMethods<T>,
) -> impl Future<Output = Result<Option<StorageResponse>, BackendError>> {
retry(move || {
let methods = methods.clone();
let key = key.clone();
async move {
let res = methods.state_get_storage(&key, Some(at)).await?;
Ok(res.map(move |value| StorageResponse { key, value }))
}
})
}
let keys = keys.clone();
let methods = self.methods.clone();
let iter = keys.into_iter().map(move |key| get_entry(key, at, methods.clone()));
let s = stream::iter(iter)
.then(|fut| fut)
.filter_map(|r| future::ready(r.transpose()));
Ok(StreamOf(Box::pin(s)))
}
async fn storage_fetch_descendant_keys(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<StreamOfResults<Vec<u8>>, BackendError> {
let keys = StorageFetchDescendantKeysStream {
at,
key,
storage_page_size: self.storage_page_size,
methods: self.methods.clone(),
done: Default::default(),
keys_fut: Default::default(),
pagination_start_key: None,
};
let keys = keys.flat_map(|keys| {
match keys {
Err(e) => {
Either::Left(stream::iter(std::iter::once(Err(e))))
},
Ok(keys) => {
Either::Right(stream::iter(keys.into_iter().map(Ok)))
},
}
});
Ok(StreamOf(Box::pin(keys)))
}
async fn storage_fetch_descendant_values(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError> {
let keys_stream = StorageFetchDescendantKeysStream {
at,
key,
storage_page_size: self.storage_page_size,
methods: self.methods.clone(),
done: Default::default(),
keys_fut: Default::default(),
pagination_start_key: None,
};
Ok(StreamOf(Box::pin(StorageFetchDescendantValuesStream {
keys: keys_stream,
results_fut: None,
results: Default::default(),
})))
}
async fn genesis_hash(&self) -> Result<HashFor<T>, BackendError> {
retry(|| async {
let hash = self.methods.genesis_hash().await?;
Ok(hash)
})
.await
}
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> {
retry(|| async {
let header = self.methods.chain_get_header(Some(at)).await?;
Ok(header)
})
.await
}
async fn block_body(&self, at: HashFor<T>) -> Result<Option<Vec<Vec<u8>>>, BackendError> {
retry(|| async {
let Some(details) = self.methods.chain_get_block(Some(at)).await? else {
return Ok(None);
};
Ok(Some(details.block.extrinsics.into_iter().map(|b| b.0).collect()))
})
.await
}
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<HashFor<T>>, BackendError> {
retry(|| async {
let hash = self.methods.chain_get_finalized_head().await?;
Ok(BlockRef::from_hash(hash))
})
.await
}
async fn current_runtime_version(&self) -> Result<RuntimeVersion, BackendError> {
retry(|| async {
let details = self.methods.state_get_runtime_version(None).await?;
Ok(RuntimeVersion {
spec_version: details.spec_version,
transaction_version: details.transaction_version,
})
})
.await
}
async fn stream_runtime_version(
&self,
) -> Result<StreamOfResults<RuntimeVersion>, BackendError> {
let methods = self.methods.clone();
let retry_sub = retry_stream(move || {
let methods = methods.clone();
Box::pin(async move {
let sub = methods.state_subscribe_runtime_version().await?;
let sub = sub.map_err(|e| e.into()).map(|r| {
r.map(|v| RuntimeVersion {
spec_version: v.spec_version,
transaction_version: v.transaction_version,
})
});
Ok(StreamOf(Box::pin(sub)))
})
})
.await?;
let stream = retry_sub.filter(|r| {
let mut keep = true;
if let Err(e) = r {
if e.is_disconnected_will_reconnect() {
keep = false;
}
}
async move { keep }
});
Ok(StreamOf(Box::pin(stream)))
}
async fn stream_all_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
let methods = self.methods.clone();
let retry_sub = retry_stream(move || {
let methods = methods.clone();
Box::pin(async move {
let sub = methods.chain_subscribe_all_heads().await?;
let sub = sub.map_err(|e| e.into()).map(move |r| {
r.map(|h| {
let hash = h.hash_with(hasher);
(h, BlockRef::from_hash(hash))
})
});
Ok(StreamOf(Box::pin(sub)))
})
})
.await?;
Ok(retry_sub)
}
async fn stream_best_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
let methods = self.methods.clone();
let retry_sub = retry_stream(move || {
let methods = methods.clone();
Box::pin(async move {
let sub = methods.chain_subscribe_new_heads().await?;
let sub = sub.map_err(|e| e.into()).map(move |r| {
r.map(|h| {
let hash = h.hash_with(hasher);
(h, BlockRef::from_hash(hash))
})
});
Ok(StreamOf(Box::pin(sub)))
})
})
.await?;
Ok(retry_sub)
}
async fn stream_finalized_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
let this = self.clone();
let retry_sub = retry_stream(move || {
let this = this.clone();
Box::pin(async move {
let sub = this.methods.chain_subscribe_finalized_heads().await?;
let last_finalized_block_ref = this.latest_finalized_block_ref().await?;
let last_finalized_block_num = this
.block_header(last_finalized_block_ref.hash())
.await?
.map(|h| h.number().into());
let sub = subscribe_to_block_headers_filling_in_gaps(
this.methods.clone(),
sub,
last_finalized_block_num,
);
let sub = sub.map(move |r| {
r.map(|h| {
let hash = h.hash_with(hasher);
(h, BlockRef::from_hash(hash))
})
});
Ok(StreamOf(Box::pin(sub)))
})
})
.await?;
Ok(retry_sub)
}
async fn submit_transaction(
&self,
extrinsic: &[u8],
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, BackendError> {
let sub = self.methods.author_submit_and_watch_extrinsic(extrinsic).await?;
let sub = sub.filter_map(|r| {
let mapped =
r.map_err(|e| e.into())
.map(|tx| {
match tx {
RpcTransactionStatus::Future => None,
RpcTransactionStatus::Retracted(_) => None,
RpcTransactionStatus::Ready => Some(TransactionStatus::Validated),
RpcTransactionStatus::Broadcast(_peers) =>
Some(TransactionStatus::Broadcasted),
RpcTransactionStatus::InBlock(hash) =>
Some(TransactionStatus::InBestBlock { hash: BlockRef::from_hash(hash) }),
RpcTransactionStatus::FinalityTimeout(_) =>
Some(TransactionStatus::Dropped { message: "Finality timeout".into() }),
RpcTransactionStatus::Finalized(hash) =>
Some(TransactionStatus::InFinalizedBlock {
hash: BlockRef::from_hash(hash),
}),
RpcTransactionStatus::Usurped(_) => Some(TransactionStatus::Invalid {
message: "Transaction was usurped by another with the same nonce"
.into(),
}),
RpcTransactionStatus::Dropped => Some(TransactionStatus::Dropped {
message: "Transaction was dropped".into(),
}),
RpcTransactionStatus::Invalid => Some(TransactionStatus::Invalid {
message:
"Transaction is invalid (eg because of a bad nonce, signature etc)"
.into(),
}),
}
})
.transpose();
future::ready(mapped)
});
Ok(StreamOf::new(Box::pin(sub)))
}
async fn call(
&self,
method: &str,
call_parameters: Option<&[u8]>,
at: HashFor<T>,
) -> Result<Vec<u8>, BackendError> {
retry(|| async {
let res = self.methods.state_call(method, call_parameters, Some(at)).await?;
Ok(res)
})
.await
}
}
#[doc(hidden)]
pub fn subscribe_to_block_headers_filling_in_gaps<T, S, E>(
methods: LegacyRpcMethods<T>,
sub: S,
mut last_block_num: Option<u64>,
) -> impl Stream<Item = Result<T::Header, BackendError>> + Send
where
T: Config,
S: Stream<Item = Result<T::Header, E>> + Send,
E: Into<BackendError> + Send + 'static,
{
sub.flat_map(move |s| {
let header = match s {
Ok(header) => header,
Err(e) => return Either::Left(stream::once(async { Err(e.into()) })),
};
let end_block_num = header.number().into();
let start_block_num = last_block_num.map(|n| n + 1).unwrap_or(end_block_num);
let methods = methods.clone();
let previous_headers = stream::iter(start_block_num..end_block_num)
.then(move |n| {
let methods = methods.clone();
async move {
let hash = methods.chain_get_block_hash(Some(n.into())).await?;
let header = methods.chain_get_header(hash).await?;
Ok::<_, BackendError>(header)
}
})
.filter_map(async |h| h.transpose());
last_block_num = Some(end_block_num);
Either::Right(previous_headers.chain(stream::once(async { Ok(header) })))
})
}
#[allow(clippy::type_complexity)]
pub struct StorageFetchDescendantKeysStream<T: Config> {
methods: LegacyRpcMethods<T>,
key: Vec<u8>,
at: HashFor<T>,
storage_page_size: u32,
pagination_start_key: Option<Vec<u8>>,
keys_fut:
Option<Pin<Box<dyn Future<Output = Result<Vec<Vec<u8>>, BackendError>> + Send + 'static>>>,
done: bool,
}
impl<T: Config> std::marker::Unpin for StorageFetchDescendantKeysStream<T> {}
impl<T: Config> Stream for StorageFetchDescendantKeysStream<T> {
type Item = Result<Vec<Vec<u8>>, BackendError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut();
loop {
if this.done {
return Poll::Ready(None);
}
if let Some(mut keys_fut) = this.keys_fut.take() {
let Poll::Ready(keys) = keys_fut.poll_unpin(cx) else {
this.keys_fut = Some(keys_fut);
return Poll::Pending;
};
match keys {
Ok(mut keys) => {
if this.pagination_start_key.is_some()
&& keys.first() == this.pagination_start_key.as_ref()
{
keys.remove(0);
}
if keys.is_empty() {
this.done = true;
return Poll::Ready(None);
}
this.pagination_start_key = keys.last().cloned();
return Poll::Ready(Some(Ok(keys)));
},
Err(e) => {
if e.is_disconnected_will_reconnect() {
this.keys_fut = Some(keys_fut);
continue;
}
return Poll::Ready(Some(Err(e)));
},
}
}
let methods = this.methods.clone();
let key = this.key.clone();
let at = this.at;
let storage_page_size = this.storage_page_size;
let pagination_start_key = this.pagination_start_key.clone();
let keys_fut = async move {
let keys = methods
.state_get_keys_paged(
&key,
storage_page_size,
pagination_start_key.as_deref(),
Some(at),
)
.await?;
Ok(keys)
};
this.keys_fut = Some(Box::pin(keys_fut));
}
}
}
#[allow(clippy::type_complexity)]
pub struct StorageFetchDescendantValuesStream<T: Config> {
keys: StorageFetchDescendantKeysStream<T>,
results_fut: Option<
Pin<
Box<
dyn Future<Output = Result<Option<VecDeque<(Vec<u8>, Vec<u8>)>>, BackendError>>
+ Send
+ 'static,
>,
>,
>,
results: VecDeque<(Vec<u8>, Vec<u8>)>,
}
impl<T: Config> Stream for StorageFetchDescendantValuesStream<T> {
type Item = Result<StorageResponse, BackendError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.as_mut();
loop {
if let Some((key, value)) = this.results.pop_front() {
let res = StorageResponse { key, value };
return Poll::Ready(Some(Ok(res)));
}
if let Some(mut results_fut) = this.results_fut.take() {
match results_fut.poll_unpin(cx) {
Poll::Ready(Ok(Some(results))) => {
this.results = results;
continue;
},
Poll::Ready(Ok(None)) => {
continue;
},
Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
Poll::Pending => {
this.results_fut = Some(results_fut);
return Poll::Pending;
},
}
}
match this.keys.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(keys))) => {
let methods = this.keys.methods.clone();
let at = this.keys.at;
let results_fut = async move {
let keys = keys.iter().map(|k| &**k);
let values = retry(|| async {
let res =
methods.state_query_storage_at(keys.clone(), Some(at)).await?;
Ok(res)
})
.await?;
let values: VecDeque<_> = values
.into_iter()
.flat_map(|v| {
v.changes.into_iter().filter_map(|(k, v)| {
let v = v?;
Some((k.0, v.0))
})
})
.collect();
Ok(Some(values))
};
this.results_fut = Some(Box::pin(results_fut));
continue;
},
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => return Poll::Ready(None),
Poll::Pending => return Poll::Pending,
}
}
}
}