use crate::backend::chain_head::ChainHeadBackendDriver;
use crate::backend::{
Backend, BlockRef, StorageResponse, StreamOfResults, TransactionStatus,
archive::ArchiveBackend, chain_head::ChainHeadBackend, legacy::LegacyBackend,
};
use crate::config::{Config, HashFor};
use crate::error::{BackendError, CombinedBackendError};
use async_trait::async_trait;
use futures::Stream;
use futures::StreamExt;
use std::task::Poll;
use subxt_rpcs::RpcClient;
#[derive(Debug)]
pub struct CombinedBackendBuilder<T: Config> {
archive: BackendChoice<ArchiveBackend<T>>,
chainhead: BackendChoice<ChainHeadBackend<T>>,
legacy: BackendChoice<LegacyBackend<T>>,
}
#[derive(Debug)]
enum BackendChoice<V> {
Use(V),
DontUse,
UseDefault,
}
impl<V> BackendChoice<V> {
fn use_with_default<F: FnOnce() -> V>(self, default: F) -> Option<V> {
match self {
BackendChoice::DontUse => None,
BackendChoice::Use(b) => Some(b),
BackendChoice::UseDefault => Some(default()),
}
}
}
impl<T: Config> Default for CombinedBackendBuilder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Config> CombinedBackendBuilder<T> {
pub fn new() -> Self {
CombinedBackendBuilder {
archive: BackendChoice::UseDefault,
chainhead: BackendChoice::UseDefault,
legacy: BackendChoice::UseDefault,
}
}
pub fn with_archive_backend(mut self, backend: ArchiveBackend<T>) -> Self {
self.archive = BackendChoice::Use(backend);
self
}
pub fn with_chainhead_backend(mut self, backend: ChainHeadBackend<T>) -> Self {
self.chainhead = BackendChoice::Use(backend);
self
}
pub fn with_legacy_backend(mut self, backend: LegacyBackend<T>) -> Self {
self.legacy = BackendChoice::Use(backend);
self
}
pub fn no_default_backends(mut self) -> Self {
if matches!(self.legacy, BackendChoice::UseDefault) {
self.legacy = BackendChoice::DontUse;
}
if matches!(self.archive, BackendChoice::UseDefault) {
self.archive = BackendChoice::DontUse;
}
if matches!(self.chainhead, BackendChoice::UseDefault) {
self.chainhead = BackendChoice::DontUse;
}
self
}
pub async fn build(
self,
rpc_client: impl Into<RpcClient>,
) -> Result<(CombinedBackend<T>, CombinedBackendDriver<T>), CombinedBackendError> {
let rpc_client = rpc_client.into();
#[derive(serde::Deserialize)]
struct Methods {
methods: Vec<String>,
}
let methods: Methods = rpc_client
.request("rpc_methods", subxt_rpcs::rpc_params![])
.await
.map_err(CombinedBackendError::CouldNotObtainRpcMethodList)?;
let methods = methods.methods;
let has_archive_methods = methods.iter().any(|m| m.starts_with("archive_v1_"));
let has_chainhead_methods = methods.iter().any(|m| m.starts_with("chainHead_v1_"));
let mut combined_driver = CombinedBackendDriver {
chainhead_driver: None,
};
let archive = if has_archive_methods {
self.archive
.use_with_default(|| ArchiveBackend::new(rpc_client.clone()))
} else {
None
};
let chainhead = if has_chainhead_methods {
self.chainhead.use_with_default(|| {
let (chainhead, chainhead_driver) =
ChainHeadBackend::builder().build(rpc_client.clone());
combined_driver.chainhead_driver = Some(chainhead_driver);
chainhead
})
} else {
None
};
let legacy = self
.legacy
.use_with_default(|| LegacyBackend::builder().build(rpc_client.clone()));
let combined = CombinedBackend {
archive,
chainhead,
legacy,
};
Ok((combined, combined_driver))
}
#[cfg(feature = "runtime")]
pub async fn build_with_background_driver(
self,
client: impl Into<RpcClient>,
) -> Result<CombinedBackend<T>, CombinedBackendError> {
let (backend, mut driver) = self.build(client).await?;
super::utils::spawn(async move {
while let Some(res) = driver.next().await {
if let Err(err) = res {
tracing::debug!(target: "subxt", "chainHead backend error={err}");
}
}
tracing::debug!(target: "subxt", "combined backend was closed");
});
Ok(backend)
}
}
#[derive(Debug)]
pub struct CombinedBackendDriver<T: Config> {
chainhead_driver: Option<ChainHeadBackendDriver<T>>,
}
impl<T: Config> CombinedBackendDriver<T> {
pub fn needs_polling(&self) -> bool {
self.chainhead_driver.is_some()
}
}
impl<T: Config> Stream for CombinedBackendDriver<T> {
type Item = <ChainHeadBackendDriver<T> as Stream>::Item;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match &mut self.chainhead_driver {
Some(driver) => driver.poll_next_unpin(cx),
None => Poll::Ready(None),
}
}
}
#[derive(Debug)]
pub struct CombinedBackend<T: Config> {
archive: Option<ArchiveBackend<T>>,
chainhead: Option<ChainHeadBackend<T>>,
legacy: Option<LegacyBackend<T>>,
}
impl<T: Config> CombinedBackend<T> {
pub fn builder() -> CombinedBackendBuilder<T> {
CombinedBackendBuilder::new()
}
fn archive(&self) -> Option<&dyn Backend<T>> {
self.archive.as_ref().map(|a| {
let a: &dyn Backend<T> = a;
a
})
}
fn chainhead(&self) -> Option<&dyn Backend<T>> {
self.chainhead.as_ref().map(|a| {
let a: &dyn Backend<T> = a;
a
})
}
fn legacy(&self) -> Option<&dyn Backend<T>> {
self.legacy.as_ref().map(|a| {
let a: &dyn Backend<T> = a;
a
})
}
}
impl<T: Config> super::sealed::Sealed for CombinedBackend<T> {}
#[async_trait]
impl<T: Config> Backend<T> for CombinedBackend<T> {
async fn storage_fetch_values(
&self,
keys: Vec<Vec<u8>>,
at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError> {
try_backends(
&[self.archive(), self.chainhead(), self.legacy()],
async |b: &dyn Backend<T>| b.storage_fetch_values(keys.clone(), at).await,
)
.await
}
async fn storage_fetch_descendant_keys(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<StreamOfResults<Vec<u8>>, BackendError> {
try_backends(
&[self.archive(), self.chainhead(), self.legacy()],
async |b: &dyn Backend<T>| b.storage_fetch_descendant_keys(key.clone(), at).await,
)
.await
}
async fn storage_fetch_descendant_values(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError> {
try_backends(
&[self.archive(), self.chainhead(), self.legacy()],
async |b: &dyn Backend<T>| b.storage_fetch_descendant_values(key.clone(), at).await,
)
.await
}
async fn genesis_hash(&self) -> Result<HashFor<T>, BackendError> {
try_backends(
&[self.archive(), self.chainhead(), self.legacy()],
async |b: &dyn Backend<T>| b.genesis_hash().await,
)
.await
}
async fn block_number_to_hash(
&self,
number: u64,
) -> Result<Option<BlockRef<HashFor<T>>>, BackendError> {
try_backends(
&[
self.archive(),
self.legacy(),
self.chainhead(),
],
async |b: &dyn Backend<T>| b.block_number_to_hash(number).await,
)
.await
}
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError> {
try_backends(
&[self.archive(), self.chainhead(), self.legacy()],
async |b: &dyn Backend<T>| b.block_header(at).await,
)
.await
}
async fn block_body(&self, at: HashFor<T>) -> Result<Option<Vec<Vec<u8>>>, BackendError> {
try_backends(
&[self.archive(), self.chainhead(), self.legacy()],
async |b: &dyn Backend<T>| b.block_body(at).await,
)
.await
}
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<HashFor<T>>, BackendError> {
try_backends(
&[
self.chainhead(),
self.archive(),
self.legacy(),
],
async |b: &dyn Backend<T>| b.latest_finalized_block_ref().await,
)
.await
}
async fn stream_all_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
try_backends(
&[
self.chainhead(),
self.legacy(),
],
async |b: &dyn Backend<T>| b.stream_all_block_headers(hasher.clone()).await,
)
.await
}
async fn stream_best_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
try_backends(
&[
self.chainhead(),
self.legacy(),
],
async |b: &dyn Backend<T>| b.stream_best_block_headers(hasher.clone()).await,
)
.await
}
async fn stream_finalized_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError> {
try_backends(
&[
self.chainhead(),
self.legacy(),
],
async |b: &dyn Backend<T>| b.stream_finalized_block_headers(hasher.clone()).await,
)
.await
}
async fn submit_transaction(
&self,
extrinsic: &[u8],
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, BackendError> {
try_backends(
&[
self.chainhead(),
self.legacy(),
self.archive(),
],
async |b: &dyn Backend<T>| b.submit_transaction(extrinsic).await,
)
.await
}
async fn call(
&self,
method: &str,
call_parameters: Option<&[u8]>,
at: HashFor<T>,
) -> Result<Vec<u8>, BackendError> {
try_backends(
&[self.archive(), self.chainhead(), self.legacy()],
async |b: &dyn Backend<T>| b.call(method, call_parameters, at).await,
)
.await
}
}
async fn try_backends<'s, 'b, T, Func, Fut, O>(
backends: &'s [Option<&'b dyn Backend<T>>],
mut f: Func,
) -> Result<O, BackendError>
where
'b: 's,
T: Config,
Func: FnMut(&'b dyn Backend<T>) -> Fut,
Fut: Future<Output = Result<O, BackendError>> + 'b,
{
static NO_AVAILABLE_BACKEND: &str =
"None of the configured backends are capable of handling this request";
let mut err = BackendError::other(NO_AVAILABLE_BACKEND);
for backend in backends.iter().filter_map(|b| *b) {
match f(backend).await {
Ok(res) => return Ok(res),
Err(e) => err = e,
}
}
Err(err)
}