mod archive;
mod chain_head;
mod combined;
mod legacy;
pub mod utils;
use crate::config::{Config, HashFor};
use crate::error::BackendError;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use std::pin::Pin;
use std::sync::Arc;
pub use archive::ArchiveBackend;
pub use chain_head::{ChainHeadBackend, ChainHeadBackendBuilder, ChainHeadBackendDriver};
pub use combined::{CombinedBackend, CombinedBackendBuilder, CombinedBackendDriver};
pub use legacy::{LegacyBackend, LegacyBackendBuilder};
#[doc(hidden)]
pub use legacy::subscribe_to_block_headers_filling_in_gaps;
#[doc(hidden)]
mod sealed {
pub trait Sealed {}
}
#[async_trait]
pub trait Backend<T: Config>: sealed::Sealed + Send + Sync + 'static {
async fn storage_fetch_values(
&self,
keys: Vec<Vec<u8>>,
at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError>;
async fn storage_fetch_descendant_keys(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<StreamOfResults<Vec<u8>>, BackendError>;
async fn storage_fetch_descendant_values(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<StreamOfResults<StorageResponse>, BackendError>;
async fn genesis_hash(&self) -> Result<HashFor<T>, BackendError>;
async fn block_number_to_hash(
&self,
number: u64,
) -> Result<Option<BlockRef<HashFor<T>>>, BackendError>;
async fn block_header(&self, at: HashFor<T>) -> Result<Option<T::Header>, BackendError>;
async fn block_body(&self, at: HashFor<T>) -> Result<Option<Vec<Vec<u8>>>, BackendError>;
async fn latest_finalized_block_ref(&self) -> Result<BlockRef<HashFor<T>>, BackendError>;
async fn stream_all_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError>;
async fn stream_best_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError>;
async fn stream_finalized_block_headers(
&self,
hasher: T::Hasher,
) -> Result<StreamOfResults<(T::Header, BlockRef<HashFor<T>>)>, BackendError>;
async fn submit_transaction(
&self,
bytes: &[u8],
) -> Result<StreamOfResults<TransactionStatus<HashFor<T>>>, BackendError>;
async fn call(
&self,
method: &str,
call_parameters: Option<&[u8]>,
at: HashFor<T>,
) -> Result<Vec<u8>, BackendError>;
}
#[async_trait]
pub(crate) trait BackendExt<T: Config>: Backend<T> {
async fn storage_fetch_value(
&self,
key: Vec<u8>,
at: HashFor<T>,
) -> Result<Option<Vec<u8>>, BackendError> {
self.storage_fetch_values(vec![key], at)
.await?
.next()
.await
.transpose()
.map(|o| o.map(|s| s.value))
}
}
#[async_trait]
impl<B: Backend<T> + ?Sized, T: Config> BackendExt<T> for B {}
#[derive(Clone)]
pub struct BlockRef<H> {
hash: H,
_pointer: Option<Arc<dyn BlockRefT>>,
}
impl<H> From<H> for BlockRef<H> {
fn from(value: H) -> Self {
BlockRef::from_hash(value)
}
}
impl<H: PartialEq> PartialEq for BlockRef<H> {
fn eq(&self, other: &Self) -> bool {
self.hash == other.hash
}
}
impl<H: Eq> Eq for BlockRef<H> {}
impl<H: PartialOrd> PartialOrd for BlockRef<H> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.hash.partial_cmp(&other.hash)
}
}
impl<H: Ord> Ord for BlockRef<H> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.hash.cmp(&other.hash)
}
}
impl<H: std::fmt::Debug> std::fmt::Debug for BlockRef<H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("BlockRef").field(&self.hash).finish()
}
}
impl<H: std::hash::Hash> std::hash::Hash for BlockRef<H> {
fn hash<Hasher: std::hash::Hasher>(&self, state: &mut Hasher) {
self.hash.hash(state);
}
}
impl<H> BlockRef<H> {
pub fn from_hash(hash: H) -> Self {
Self {
hash,
_pointer: None,
}
}
pub fn new<P: BlockRefT>(hash: H, inner: P) -> Self {
Self {
hash,
_pointer: Some(Arc::new(inner)),
}
}
pub fn hash(&self) -> H
where
H: Copy,
{
self.hash
}
}
pub trait BlockRefT: Send + Sync + 'static {}
pub struct StreamOf<T>(Pin<Box<dyn Stream<Item = T> + Send + 'static>>);
impl<T> Stream for StreamOf<T> {
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.0.poll_next_unpin(cx)
}
}
impl<T> std::fmt::Debug for StreamOf<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("StreamOf").field(&"<stream>").finish()
}
}
impl<T> StreamOf<T> {
pub fn new(inner: Pin<Box<dyn Stream<Item = T> + Send + 'static>>) -> Self {
StreamOf(inner)
}
pub async fn next(&mut self) -> Option<T> {
StreamExt::next(self).await
}
}
pub type StreamOfResults<T> = StreamOf<Result<T, BackendError>>;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransactionStatus<Hash> {
Validated,
Broadcasted,
NoLongerInBestBlock,
InBestBlock {
hash: BlockRef<Hash>,
},
InFinalizedBlock {
hash: BlockRef<Hash>,
},
Error {
message: String,
},
Invalid {
message: String,
},
Dropped {
message: String,
},
}
#[derive(serde::Serialize, serde::Deserialize, Clone, PartialEq, Debug)]
pub struct StorageResponse {
pub key: Vec<u8>,
pub value: Vec<u8>,
}