use std::{fmt::Debug, marker::PhantomData};
use crate::{utils, ProviderCall};
use alloy_eips::{BlockId, BlockNumberOrTag};
use alloy_json_rpc::RpcRecv;
use alloy_network::BlockResponse;
use alloy_network_primitives::{BlockTransactionsKind, HeaderResponse};
use alloy_primitives::{Address, BlockHash, B256, B64};
use alloy_rpc_client::{ClientRef, RpcCall};
#[cfg(feature = "pubsub")]
use alloy_rpc_types_eth::pubsub::SubscriptionKind;
use alloy_transport::{TransportError, TransportResult};
use either::Either;
use futures::{Stream, StreamExt};
use serde_json::Value;
use std::time::Duration;
use super::FilterPollerBuilder;
#[derive(Clone, Debug, Default)]
pub struct EthGetBlockParams {
block: BlockId,
kind: BlockTransactionsKind,
}
impl serde::Serialize for EthGetBlockParams {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
use serde::ser::SerializeTuple;
let mut tuple = serializer.serialize_tuple(2)?;
match self.block {
BlockId::Hash(hash) => tuple.serialize_element(&hash.block_hash)?,
BlockId::Number(number) => tuple.serialize_element(&number)?,
}
if self.kind.is_hashes() {
tuple.serialize_element(&false)?;
} else {
tuple.serialize_element(&true)?
};
tuple.end()
}
}
impl EthGetBlockParams {
pub const fn new(block: BlockId, kind: BlockTransactionsKind) -> Self {
Self { block, kind }
}
}
#[must_use = "EthGetBlockBy must be awaited to execute the request"]
pub struct EthGetBlock<BlockResp>
where
BlockResp: alloy_network::BlockResponse + RpcRecv,
{
inner: GetBlockInner<BlockResp>,
block: BlockId,
kind: BlockTransactionsKind,
_pd: std::marker::PhantomData<BlockResp>,
}
impl<BlockResp> EthGetBlock<BlockResp>
where
BlockResp: alloy_network::BlockResponse + RpcRecv,
{
pub fn by_hash(hash: BlockHash, client: ClientRef<'_>) -> Self {
let params = EthGetBlockParams::default();
let call = client.request("eth_getBlockByHash", params);
Self::new_rpc(hash.into(), call)
}
pub fn by_number(number: BlockNumberOrTag, client: ClientRef<'_>) -> Self {
let params = EthGetBlockParams::default();
if number.is_pending() {
return Self::new_pending_rpc(client.request("eth_getBlockByNumber", params));
}
Self::new_rpc(number.into(), client.request("eth_getBlockByNumber", params))
}
}
impl<BlockResp> EthGetBlock<BlockResp>
where
BlockResp: alloy_network::BlockResponse + RpcRecv,
{
pub fn new_rpc(block: BlockId, inner: RpcCall<EthGetBlockParams, Option<BlockResp>>) -> Self {
Self {
block,
inner: GetBlockInner::RpcCall(inner),
kind: BlockTransactionsKind::Hashes,
_pd: PhantomData,
}
}
pub fn new_pending_rpc(inner: RpcCall<EthGetBlockParams, Value>) -> Self {
Self {
block: BlockId::pending(),
inner: GetBlockInner::PendingBlock(inner),
kind: BlockTransactionsKind::Hashes,
_pd: PhantomData,
}
}
pub fn new_provider(block: BlockId, producer: ProviderCallProducer<BlockResp>) -> Self {
Self {
block,
inner: GetBlockInner::ProviderCall(producer),
kind: BlockTransactionsKind::Hashes,
_pd: PhantomData,
}
}
pub const fn kind(mut self, kind: BlockTransactionsKind) -> Self {
self.kind = kind;
self
}
pub const fn full(mut self) -> Self {
self.kind = BlockTransactionsKind::Full;
self
}
pub const fn hashes(mut self) -> Self {
self.kind = BlockTransactionsKind::Hashes;
self
}
}
impl<BlockResp> std::future::IntoFuture for EthGetBlock<BlockResp>
where
BlockResp: alloy_network::BlockResponse + RpcRecv,
{
type Output = TransportResult<Option<BlockResp>>;
type IntoFuture = ProviderCall<EthGetBlockParams, Option<BlockResp>>;
fn into_future(self) -> Self::IntoFuture {
match self.inner {
GetBlockInner::RpcCall(call) => {
let rpc_call =
call.map_params(|_params| EthGetBlockParams::new(self.block, self.kind));
let fut = async move {
let resp = rpc_call.await?;
let result =
if self.kind.is_hashes() { utils::convert_to_hashes(resp) } else { resp };
Ok(result)
};
ProviderCall::BoxedFuture(Box::pin(fut))
}
GetBlockInner::PendingBlock(call) => {
let rpc_call =
call.map_params(|_params| EthGetBlockParams::new(self.block, self.kind));
let map_fut = async move {
let mut block = rpc_call.await?;
if block.is_null() {
return Ok(None);
}
tracing::trace!(pending_block = ?block.to_string());
if block.get("hash").is_none_or(|v| v.is_null()) {
block["hash"] = Value::String(format!("{}", B256::ZERO));
}
if block.get("nonce").is_none_or(|v| v.is_null()) {
block["nonce"] = Value::String(format!("{}", B64::ZERO));
}
if block.get("miner").is_none_or(|v| v.is_null())
|| block.get("beneficiary").is_none_or(|v| v.is_null())
{
block["miner"] = Value::String(format!("{}", Address::ZERO));
}
let block = serde_json::from_value(block.clone())
.map_err(|e| TransportError::deser_err(e, block.to_string()))?;
let block = if self.kind.is_hashes() {
utils::convert_to_hashes(Some(block))
} else {
Some(block)
};
Ok(block)
};
ProviderCall::BoxedFuture(Box::pin(map_fut))
}
GetBlockInner::ProviderCall(producer) => producer(self.kind),
}
}
}
impl<BlockResp> core::fmt::Debug for EthGetBlock<BlockResp>
where
BlockResp: BlockResponse + RpcRecv,
{
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
f.debug_struct("EthGetBlock").field("block", &self.block).field("kind", &self.kind).finish()
}
}
type ProviderCallProducer<BlockResp> =
Box<dyn Fn(BlockTransactionsKind) -> ProviderCall<EthGetBlockParams, Option<BlockResp>> + Send>;
enum GetBlockInner<BlockResp>
where
BlockResp: BlockResponse + RpcRecv,
{
RpcCall(RpcCall<EthGetBlockParams, Option<BlockResp>>),
PendingBlock(RpcCall<EthGetBlockParams, Value>),
ProviderCall(ProviderCallProducer<BlockResp>),
}
impl<BlockResp> core::fmt::Debug for GetBlockInner<BlockResp>
where
BlockResp: BlockResponse + RpcRecv,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::RpcCall(call) => f.debug_tuple("RpcCall").field(call).finish(),
Self::PendingBlock(call) => f.debug_tuple("PendingBlock").field(call).finish(),
Self::ProviderCall(_) => f.debug_struct("ProviderCall").finish(),
}
}
}
#[derive(Debug)]
#[must_use = "this builder does nothing unless you call `.into_stream`"]
pub struct WatchBlocks<BlockResp> {
poller: FilterPollerBuilder<B256>,
kind: BlockTransactionsKind,
_pd: std::marker::PhantomData<BlockResp>,
}
impl<BlockResp> WatchBlocks<BlockResp>
where
BlockResp: BlockResponse + RpcRecv,
{
pub(crate) const fn new(poller: FilterPollerBuilder<B256>) -> Self {
Self { poller, kind: BlockTransactionsKind::Hashes, _pd: PhantomData }
}
pub const fn full(mut self) -> Self {
self.kind = BlockTransactionsKind::Full;
self
}
pub const fn hashes(mut self) -> Self {
self.kind = BlockTransactionsKind::Hashes;
self
}
pub const fn set_channel_size(&mut self, channel_size: usize) {
self.poller.set_channel_size(channel_size);
}
pub fn set_limit(&mut self, limit: Option<usize>) {
self.poller.set_limit(limit);
}
pub const fn set_poll_interval(&mut self, poll_interval: Duration) {
self.poller.set_poll_interval(poll_interval);
}
pub fn into_stream(self) -> impl Stream<Item = TransportResult<BlockResp>> + Unpin {
let client = self.poller.client();
let kind = self.kind;
let stream = self
.poller
.into_stream()
.then(move |hashes| utils::hashes_to_blocks(hashes, client.clone(), kind.into()))
.flat_map(|res| {
futures::stream::iter(match res {
Ok(blocks) => {
Either::Left(blocks.into_iter().filter_map(|block| block.map(Ok)))
}
Err(err) => Either::Right(std::iter::once(Err(err))),
})
});
Box::pin(stream)
}
}
#[derive(Debug)]
#[must_use = "this builder does nothing unless you call `.into_stream`"]
pub struct WatchHeaders<HeaderResp> {
poller: FilterPollerBuilder<B256>,
_pd: std::marker::PhantomData<HeaderResp>,
}
impl<HeaderResp> WatchHeaders<HeaderResp>
where
HeaderResp: HeaderResponse + RpcRecv,
{
pub(crate) const fn new(poller: FilterPollerBuilder<B256>) -> Self {
Self { poller, _pd: PhantomData }
}
pub const fn set_channel_size(&mut self, channel_size: usize) {
self.poller.set_channel_size(channel_size);
}
pub fn set_limit(&mut self, limit: Option<usize>) {
self.poller.set_limit(limit);
}
pub const fn set_poll_interval(&mut self, poll_interval: Duration) {
self.poller.set_poll_interval(poll_interval);
}
pub fn into_stream(self) -> impl Stream<Item = TransportResult<HeaderResp>> + Unpin {
let client = self.poller.client();
let stream = self
.poller
.into_stream()
.then(move |hashes| utils::hashes_to_headers(hashes, client.clone()))
.flat_map(|res| {
futures::stream::iter(match res {
Ok(headers) => {
Either::Left(headers.into_iter().filter_map(|header| header.map(Ok)))
}
Err(err) => Either::Right(std::iter::once(Err(err))),
})
});
Box::pin(stream)
}
}
#[derive(Debug)]
#[must_use = "this does nothing unless you call `.into_stream`"]
#[cfg(feature = "pubsub")]
pub struct SubFullBlocks<N: alloy_network::Network> {
sub: super::GetSubscription<(SubscriptionKind,), N::HeaderResponse>,
client: alloy_rpc_client::WeakClient,
kind: BlockTransactionsKind,
}
#[cfg(feature = "pubsub")]
impl<N: alloy_network::Network> SubFullBlocks<N> {
pub const fn new(
sub: super::GetSubscription<(SubscriptionKind,), N::HeaderResponse>,
client: alloy_rpc_client::WeakClient,
) -> Self {
Self { sub, client, kind: BlockTransactionsKind::Hashes }
}
pub const fn full(mut self) -> Self {
self.kind = BlockTransactionsKind::Full;
self
}
pub const fn hashes(mut self) -> Self {
self.kind = BlockTransactionsKind::Hashes;
self
}
pub fn channel_size(mut self, size: usize) -> Self {
self.sub = self.sub.channel_size(size);
self
}
pub async fn into_stream(
self,
) -> TransportResult<impl Stream<Item = TransportResult<N::BlockResponse>> + Unpin> {
use alloy_network_primitives::HeaderResponse;
use futures::StreamExt;
let sub = self.sub.await?;
let stream = sub
.into_stream()
.then(move |resp| {
let hash = resp.hash();
let kind = self.kind;
let client_weak = self.client.clone();
async move {
let client = client_weak
.upgrade()
.ok_or(TransportError::local_usage_str("Client dropped"))?;
let call = client.request("eth_getBlockByHash", (hash, kind.is_full()));
let resp = call.await?;
if kind.is_hashes() {
Ok(utils::convert_to_hashes(resp))
} else {
Ok(resp)
}
}
})
.filter_map(|result| futures::future::ready(result.transpose()));
#[cfg(not(target_family = "wasm"))]
{
Ok(stream.boxed())
}
#[cfg(target_family = "wasm")]
{
Ok(stream.boxed_local())
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Provider, ProviderBuilder};
#[tokio::test]
async fn test_pending_block_deser() {
let provider =
ProviderBuilder::new().connect_http("https://binance.llamarpc.com".parse().unwrap());
let res = provider.get_block_by_number(BlockNumberOrTag::Pending).full().await;
if let Err(err) = &res {
let err_str = err.to_string();
if err_str.contains("no response") || err.is_transport_error() {
eprintln!("skipping flaky response: {err:?}");
return;
}
}
let _block = res.unwrap();
}
}