1use std::{fmt::Debug, marker::PhantomData};
2
3use crate::{utils, ProviderCall};
4use alloy_eips::{BlockId, BlockNumberOrTag};
5use alloy_json_rpc::RpcRecv;
6use alloy_network::BlockResponse;
7use alloy_network_primitives::{BlockTransactionsKind, HeaderResponse};
8use alloy_primitives::{Address, BlockHash, B256, B64};
9use alloy_rpc_client::{ClientRef, RpcCall};
10#[cfg(feature = "pubsub")]
11use alloy_rpc_types_eth::pubsub::SubscriptionKind;
12use alloy_transport::{TransportError, TransportResult};
13use either::Either;
14use futures::{Stream, StreamExt};
15use serde_json::Value;
16use std::time::Duration;
17
18use super::FilterPollerBuilder;
19
20#[derive(Clone, Debug, Default)]
24pub struct EthGetBlockParams {
25 block: BlockId,
26 kind: BlockTransactionsKind,
27}
28
29impl serde::Serialize for EthGetBlockParams {
30 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
31 where
32 S: serde::Serializer,
33 {
34 use serde::ser::SerializeTuple;
35
36 let mut tuple = serializer.serialize_tuple(2)?;
37 match self.block {
38 BlockId::Hash(hash) => tuple.serialize_element(&hash.block_hash)?,
39 BlockId::Number(number) => tuple.serialize_element(&number)?,
40 }
41 if self.kind.is_hashes() {
42 tuple.serialize_element(&false)?;
43 } else {
44 tuple.serialize_element(&true)?
45 };
46 tuple.end()
47 }
48}
49
50impl EthGetBlockParams {
51 pub const fn new(block: BlockId, kind: BlockTransactionsKind) -> Self {
53 Self { block, kind }
54 }
55}
56
57#[must_use = "EthGetBlockBy must be awaited to execute the request"]
62pub struct EthGetBlock<BlockResp>
64where
65 BlockResp: alloy_network::BlockResponse + RpcRecv,
66{
67 inner: GetBlockInner<BlockResp>,
68 block: BlockId,
69 kind: BlockTransactionsKind,
70 _pd: std::marker::PhantomData<BlockResp>,
71}
72
73impl<BlockResp> EthGetBlock<BlockResp>
74where
75 BlockResp: alloy_network::BlockResponse + RpcRecv,
76{
77 pub fn by_hash(hash: BlockHash, client: ClientRef<'_>) -> Self {
80 let params = EthGetBlockParams::default();
81 let call = client.request("eth_getBlockByHash", params);
82 Self::new_rpc(hash.into(), call)
83 }
84
85 pub fn by_number(number: BlockNumberOrTag, client: ClientRef<'_>) -> Self {
88 let params = EthGetBlockParams::default();
89
90 if number.is_pending() {
91 return Self::new_pending_rpc(client.request("eth_getBlockByNumber", params));
92 }
93
94 Self::new_rpc(number.into(), client.request("eth_getBlockByNumber", params))
95 }
96}
97
98impl<BlockResp> EthGetBlock<BlockResp>
99where
100 BlockResp: alloy_network::BlockResponse + RpcRecv,
101{
102 pub fn new_rpc(block: BlockId, inner: RpcCall<EthGetBlockParams, Option<BlockResp>>) -> Self {
104 Self {
105 block,
106 inner: GetBlockInner::RpcCall(inner),
107 kind: BlockTransactionsKind::Hashes,
108 _pd: PhantomData,
109 }
110 }
111
112 pub fn new_pending_rpc(inner: RpcCall<EthGetBlockParams, Value>) -> Self {
114 Self {
115 block: BlockId::pending(),
116 inner: GetBlockInner::PendingBlock(inner),
117 kind: BlockTransactionsKind::Hashes,
118 _pd: PhantomData,
119 }
120 }
121
122 pub fn new_provider(block: BlockId, producer: ProviderCallProducer<BlockResp>) -> Self {
124 Self {
125 block,
126 inner: GetBlockInner::ProviderCall(producer),
127 kind: BlockTransactionsKind::Hashes,
128 _pd: PhantomData,
129 }
130 }
131
132 pub const fn kind(mut self, kind: BlockTransactionsKind) -> Self {
134 self.kind = kind;
135 self
136 }
137
138 pub const fn full(mut self) -> Self {
140 self.kind = BlockTransactionsKind::Full;
141 self
142 }
143
144 pub const fn hashes(mut self) -> Self {
146 self.kind = BlockTransactionsKind::Hashes;
147 self
148 }
149}
150
151impl<BlockResp> std::future::IntoFuture for EthGetBlock<BlockResp>
152where
153 BlockResp: alloy_network::BlockResponse + RpcRecv,
154{
155 type Output = TransportResult<Option<BlockResp>>;
156
157 type IntoFuture = ProviderCall<EthGetBlockParams, Option<BlockResp>>;
158
159 fn into_future(self) -> Self::IntoFuture {
160 match self.inner {
161 GetBlockInner::RpcCall(call) => {
162 let rpc_call =
163 call.map_params(|_params| EthGetBlockParams::new(self.block, self.kind));
164
165 let fut = async move {
166 let resp = rpc_call.await?;
167 let result =
168 if self.kind.is_hashes() { utils::convert_to_hashes(resp) } else { resp };
169 Ok(result)
170 };
171
172 ProviderCall::BoxedFuture(Box::pin(fut))
173 }
174 GetBlockInner::PendingBlock(call) => {
175 let rpc_call =
176 call.map_params(|_params| EthGetBlockParams::new(self.block, self.kind));
177
178 let map_fut = async move {
179 let mut block = rpc_call.await?;
180
181 if block.is_null() {
182 return Ok(None);
183 }
184
185 tracing::trace!(pending_block = ?block.to_string());
188 if block.get("hash").is_none_or(|v| v.is_null()) {
189 block["hash"] = Value::String(format!("{}", B256::ZERO));
190 }
191
192 if block.get("nonce").is_none_or(|v| v.is_null()) {
193 block["nonce"] = Value::String(format!("{}", B64::ZERO));
194 }
195
196 if block.get("miner").is_none_or(|v| v.is_null())
197 || block.get("beneficiary").is_none_or(|v| v.is_null())
198 {
199 block["miner"] = Value::String(format!("{}", Address::ZERO));
200 }
201
202 let block = serde_json::from_value(block.clone())
203 .map_err(|e| TransportError::deser_err(e, block.to_string()))?;
204
205 let block = if self.kind.is_hashes() {
206 utils::convert_to_hashes(Some(block))
207 } else {
208 Some(block)
209 };
210
211 Ok(block)
212 };
213
214 ProviderCall::BoxedFuture(Box::pin(map_fut))
215 }
216 GetBlockInner::ProviderCall(producer) => producer(self.kind),
217 }
218 }
219}
220
221impl<BlockResp> core::fmt::Debug for EthGetBlock<BlockResp>
222where
223 BlockResp: BlockResponse + RpcRecv,
224{
225 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
226 f.debug_struct("EthGetBlock").field("block", &self.block).field("kind", &self.kind).finish()
227 }
228}
229
230type ProviderCallProducer<BlockResp> =
231 Box<dyn Fn(BlockTransactionsKind) -> ProviderCall<EthGetBlockParams, Option<BlockResp>> + Send>;
232
233enum GetBlockInner<BlockResp>
234where
235 BlockResp: BlockResponse + RpcRecv,
236{
237 RpcCall(RpcCall<EthGetBlockParams, Option<BlockResp>>),
239 PendingBlock(RpcCall<EthGetBlockParams, Value>),
249 ProviderCall(ProviderCallProducer<BlockResp>),
251}
252
253impl<BlockResp> core::fmt::Debug for GetBlockInner<BlockResp>
254where
255 BlockResp: BlockResponse + RpcRecv,
256{
257 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
258 match self {
259 Self::RpcCall(call) => f.debug_tuple("RpcCall").field(call).finish(),
260 Self::PendingBlock(call) => f.debug_tuple("PendingBlock").field(call).finish(),
261 Self::ProviderCall(_) => f.debug_struct("ProviderCall").finish(),
262 }
263 }
264}
265
266#[derive(Debug)]
275#[must_use = "this builder does nothing unless you call `.into_stream`"]
276pub struct WatchBlocks<BlockResp> {
277 poller: FilterPollerBuilder<B256>,
284 kind: BlockTransactionsKind,
291 _pd: std::marker::PhantomData<BlockResp>,
292}
293
294impl<BlockResp> WatchBlocks<BlockResp>
295where
296 BlockResp: BlockResponse + RpcRecv,
297{
298 pub(crate) const fn new(poller: FilterPollerBuilder<B256>) -> Self {
300 Self { poller, kind: BlockTransactionsKind::Hashes, _pd: PhantomData }
301 }
302
303 pub const fn full(mut self) -> Self {
305 self.kind = BlockTransactionsKind::Full;
306 self
307 }
308
309 pub const fn hashes(mut self) -> Self {
311 self.kind = BlockTransactionsKind::Hashes;
312 self
313 }
314
315 pub const fn set_channel_size(&mut self, channel_size: usize) {
317 self.poller.set_channel_size(channel_size);
318 }
319
320 pub fn set_limit(&mut self, limit: Option<usize>) {
322 self.poller.set_limit(limit);
323 }
324
325 pub const fn set_poll_interval(&mut self, poll_interval: Duration) {
327 self.poller.set_poll_interval(poll_interval);
328 }
329
330 pub fn into_stream(self) -> impl Stream<Item = TransportResult<BlockResp>> + Unpin {
333 let client = self.poller.client();
334 let kind = self.kind;
335 let stream = self
336 .poller
337 .into_stream()
338 .then(move |hashes| utils::hashes_to_blocks(hashes, client.clone(), kind.into()))
339 .flat_map(|res| {
340 futures::stream::iter(match res {
341 Ok(blocks) => {
342 Either::Left(blocks.into_iter().filter_map(|block| block.map(Ok)))
344 }
345 Err(err) => Either::Right(std::iter::once(Err(err))),
346 })
347 });
348 Box::pin(stream)
349 }
350}
351
352#[derive(Debug)]
356#[must_use = "this builder does nothing unless you call `.into_stream`"]
357pub struct WatchHeaders<HeaderResp> {
358 poller: FilterPollerBuilder<B256>,
359 _pd: std::marker::PhantomData<HeaderResp>,
360}
361
362impl<HeaderResp> WatchHeaders<HeaderResp>
363where
364 HeaderResp: HeaderResponse + RpcRecv,
365{
366 pub(crate) const fn new(poller: FilterPollerBuilder<B256>) -> Self {
368 Self { poller, _pd: PhantomData }
369 }
370
371 pub const fn set_channel_size(&mut self, channel_size: usize) {
373 self.poller.set_channel_size(channel_size);
374 }
375
376 pub fn set_limit(&mut self, limit: Option<usize>) {
378 self.poller.set_limit(limit);
379 }
380
381 pub const fn set_poll_interval(&mut self, poll_interval: Duration) {
383 self.poller.set_poll_interval(poll_interval);
384 }
385
386 pub fn into_stream(self) -> impl Stream<Item = TransportResult<HeaderResp>> + Unpin {
389 let client = self.poller.client();
390 let stream = self
391 .poller
392 .into_stream()
393 .then(move |hashes| utils::hashes_to_headers(hashes, client.clone()))
394 .flat_map(|res| {
395 futures::stream::iter(match res {
396 Ok(headers) => {
397 Either::Left(headers.into_iter().filter_map(|header| header.map(Ok)))
398 }
399 Err(err) => Either::Right(std::iter::once(Err(err))),
400 })
401 });
402 Box::pin(stream)
403 }
404}
405
406#[derive(Debug)]
412#[must_use = "this does nothing unless you call `.into_stream`"]
413#[cfg(feature = "pubsub")]
414pub struct SubFullBlocks<N: alloy_network::Network> {
415 sub: super::GetSubscription<(SubscriptionKind,), N::HeaderResponse>,
416 client: alloy_rpc_client::WeakClient,
417 kind: BlockTransactionsKind,
418}
419
420#[cfg(feature = "pubsub")]
421impl<N: alloy_network::Network> SubFullBlocks<N> {
422 pub const fn new(
427 sub: super::GetSubscription<(SubscriptionKind,), N::HeaderResponse>,
428 client: alloy_rpc_client::WeakClient,
429 ) -> Self {
430 Self { sub, client, kind: BlockTransactionsKind::Hashes }
431 }
432
433 pub const fn full(mut self) -> Self {
435 self.kind = BlockTransactionsKind::Full;
436 self
437 }
438
439 pub const fn hashes(mut self) -> Self {
441 self.kind = BlockTransactionsKind::Hashes;
442 self
443 }
444
445 pub fn channel_size(mut self, size: usize) -> Self {
447 self.sub = self.sub.channel_size(size);
448 self
449 }
450
451 pub async fn into_stream(
453 self,
454 ) -> TransportResult<impl Stream<Item = TransportResult<N::BlockResponse>> + Unpin> {
455 use alloy_network_primitives::HeaderResponse;
456 use futures::StreamExt;
457
458 let sub = self.sub.await?;
459
460 let stream = sub
461 .into_stream()
462 .then(move |resp| {
463 let hash = resp.hash();
464 let kind = self.kind;
465 let client_weak = self.client.clone();
466
467 async move {
468 let client = client_weak
469 .upgrade()
470 .ok_or(TransportError::local_usage_str("Client dropped"))?;
471
472 let call = client.request("eth_getBlockByHash", (hash, kind.is_full()));
473 let resp = call.await?;
474
475 if kind.is_hashes() {
476 Ok(utils::convert_to_hashes(resp))
477 } else {
478 Ok(resp)
479 }
480 }
481 })
482 .filter_map(|result| futures::future::ready(result.transpose()));
483
484 #[cfg(not(target_family = "wasm"))]
485 {
486 Ok(stream.boxed())
487 }
488
489 #[cfg(target_family = "wasm")]
490 {
491 Ok(stream.boxed_local())
492 }
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499 use crate::{Provider, ProviderBuilder};
500
501 #[tokio::test]
503 async fn test_pending_block_deser() {
504 let provider =
505 ProviderBuilder::new().connect_http("https://binance.llamarpc.com".parse().unwrap());
506
507 let res = provider.get_block_by_number(BlockNumberOrTag::Pending).full().await;
508 if let Err(err) = &res {
509 let err_str = err.to_string();
510 if err_str.contains("no response") || err.is_transport_error() {
511 eprintln!("skipping flaky response: {err:?}");
513 return;
514 }
515 }
516 let _block = res.unwrap();
517 }
518}