use super::Block;
use crate::{
backend::{BlockRef, StreamOfResults},
client::OnlineClientT,
config::{Config, HashFor},
error::BlockError,
utils::PhantomDataSendSync,
};
use derive_where::derive_where;
use futures::StreamExt;
use std::future::Future;
type BlockStream<T> = StreamOfResults<T>;
type BlockStreamRes<T> = Result<BlockStream<T>, BlockError>;
#[derive_where(Clone; Client)]
pub struct BlocksClient<T, Client> {
client: Client,
_marker: PhantomDataSendSync<T>,
}
impl<T, Client> BlocksClient<T, Client> {
pub fn new(client: Client) -> Self {
Self { client, _marker: PhantomDataSendSync::new() }
}
}
impl<T, Client> BlocksClient<T, Client>
where
T: Config,
Client: OnlineClientT<T>,
{
pub fn at(
&self,
block_ref: impl Into<BlockRef<HashFor<T>>>,
) -> impl Future<Output = Result<Block<T, Client>, BlockError>> + Send + 'static {
self.at_or_latest(Some(block_ref.into()))
}
pub fn at_latest(
&self,
) -> impl Future<Output = Result<Block<T, Client>, BlockError>> + Send + 'static {
self.at_or_latest(None)
}
fn at_or_latest(
&self,
block_ref: Option<BlockRef<HashFor<T>>>,
) -> impl Future<Output = Result<Block<T, Client>, BlockError>> + Send + 'static {
let client = self.client.clone();
async move {
let block_ref = match block_ref {
Some(r) => r,
None => client
.backend()
.latest_finalized_block_ref()
.await
.map_err(BlockError::CouldNotGetLatestBlock)?,
};
let maybe_block_header =
client.backend().block_header(block_ref.hash()).await.map_err(|e| {
BlockError::CouldNotGetBlockHeader {
block_hash: block_ref.hash().into(),
reason: e,
}
})?;
let block_header = match maybe_block_header {
Some(header) => header,
None => {
return Err(BlockError::BlockNotFound { block_hash: block_ref.hash().into() });
},
};
Ok(Block::new(block_header, block_ref, client))
}
}
pub fn subscribe_all(
&self,
) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, BlockError>> + Send + 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
let hasher = client.hasher();
header_sub_fut_to_block_sub(self.clone(), async move {
let stream = client
.backend()
.stream_all_block_headers(hasher)
.await
.map_err(BlockError::CouldNotSubscribeToAllBlocks)?;
BlockStreamRes::Ok(stream)
})
}
pub fn subscribe_best(
&self,
) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, BlockError>> + Send + 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
let hasher = client.hasher();
header_sub_fut_to_block_sub(self.clone(), async move {
let stream = client
.backend()
.stream_best_block_headers(hasher)
.await
.map_err(BlockError::CouldNotSubscribeToBestBlocks)?;
BlockStreamRes::Ok(stream)
})
}
pub fn subscribe_finalized(
&self,
) -> impl Future<Output = Result<BlockStream<Block<T, Client>>, BlockError>> + Send + 'static
where
Client: Send + Sync + 'static,
{
let client = self.client.clone();
let hasher = client.hasher();
header_sub_fut_to_block_sub(self.clone(), async move {
let stream = client
.backend()
.stream_finalized_block_headers(hasher)
.await
.map_err(BlockError::CouldNotSubscribeToFinalizedBlocks)?;
BlockStreamRes::Ok(stream)
})
}
}
async fn header_sub_fut_to_block_sub<T, Client, S>(
blocks_client: BlocksClient<T, Client>,
sub: S,
) -> Result<BlockStream<Block<T, Client>>, BlockError>
where
T: Config,
S: Future<Output = Result<BlockStream<(T::Header, BlockRef<HashFor<T>>)>, BlockError>>
+ Send
+ 'static,
Client: OnlineClientT<T> + Send + Sync + 'static,
{
let sub = sub.await?.then(move |header_and_ref| {
let client = blocks_client.client.clone();
async move {
let (header, block_ref) = match header_and_ref {
Ok(header_and_ref) => header_and_ref,
Err(e) => return Err(e),
};
Ok(Block::new(header, block_ref, client))
}
});
BlockStreamRes::Ok(StreamOfResults::new(Box::pin(sub)))
}