1use alloy_network::{Ethereum, Network};
2use alloy_primitives::{BlockNumber, U64};
3use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient};
4use alloy_transport::RpcError;
5use async_stream::stream;
6use futures::{Stream, StreamExt};
7use lru::LruCache;
8use std::{marker::PhantomData, num::NonZeroUsize};
9
10#[cfg(feature = "pubsub")]
11use futures::{future::Either, FutureExt};
12
13const BLOCK_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(10).unwrap();
15
16const MAX_RETRIES: usize = 3;
18
19const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX;
21
22pub(crate) struct NewBlocks<N: Network = Ethereum> {
24 client: WeakClient,
25 next_yield: BlockNumber,
29 known_blocks: LruCache<BlockNumber, N::BlockResponse>,
31 _phantom: PhantomData<N>,
32}
33
34impl<N: Network> NewBlocks<N> {
35 pub(crate) fn new(client: WeakClient) -> Self {
36 Self {
37 client,
38 next_yield: NO_BLOCK_NUMBER,
39 known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
40 _phantom: PhantomData,
41 }
42 }
43
44 #[cfg(test)]
45 const fn with_next_yield(mut self, next_yield: u64) -> Self {
46 self.next_yield = next_yield;
47 self
48 }
49
50 pub(crate) fn into_stream(self) -> impl Stream<Item = N::BlockResponse> + 'static {
51 #[cfg(feature = "pubsub")]
53 if let Some(client) = self.client.upgrade() {
54 if client.pubsub_frontend().is_some() {
55 let subscriber = self.into_subscription_stream().map(futures::stream::iter);
56 let subscriber = futures::stream::once(subscriber);
57 return Either::Left(subscriber.flatten().flatten());
58 }
59 }
60
61 #[cfg(feature = "pubsub")]
64 let right = Either::Right;
65 #[cfg(not(feature = "pubsub"))]
66 let right = std::convert::identity;
67 right(self.into_poll_stream())
68 }
69
70 #[cfg(feature = "pubsub")]
71 async fn into_subscription_stream(
72 self,
73 ) -> Option<impl Stream<Item = N::BlockResponse> + 'static> {
74 use alloy_consensus::BlockHeader;
75
76 let Some(client) = self.client.upgrade() else {
77 debug!("client dropped");
78 return None;
79 };
80 let Some(pubsub) = client.pubsub_frontend() else {
81 error!("pubsub_frontend returned None after being Some");
82 return None;
83 };
84 let id = match client.request("eth_subscribe", ("newHeads",)).await {
85 Ok(id) => id,
86 Err(err) => {
87 error!(%err, "failed to subscribe to newHeads");
88 return None;
89 }
90 };
91 let sub = match pubsub.get_subscription(id).await {
92 Ok(sub) => sub,
93 Err(err) => {
94 error!(%err, "failed to get subscription");
95 return None;
96 }
97 };
98 let stream =
99 sub.into_typed::<N::HeaderResponse>().into_stream().map(|header| header.number());
100 Some(self.into_block_stream(stream))
101 }
102
103 fn into_poll_stream(self) -> impl Stream<Item = N::BlockResponse> + 'static {
104 let stream =
106 PollerBuilder::<NoParams, U64>::new(self.client.clone(), "eth_blockNumber", [])
107 .into_stream()
108 .map(|n| n.to());
109
110 self.into_block_stream(stream)
111 }
112
113 fn into_block_stream(
114 mut self,
115 mut numbers_stream: impl Stream<Item = u64> + Unpin + 'static,
116 ) -> impl Stream<Item = N::BlockResponse> + 'static {
117 stream! {
118 'task: loop {
119 while let Some(known_block) = self.known_blocks.pop(&self.next_yield) {
121 debug!(number=self.next_yield, "yielding block");
122 self.next_yield += 1;
123 yield known_block;
124 }
125
126 let Some(block_number) = numbers_stream.next().await else {
128 debug!("polling stream ended");
129 break 'task;
130 };
131 trace!(%block_number, "got block number");
132 if self.next_yield == NO_BLOCK_NUMBER {
133 assert!(block_number < NO_BLOCK_NUMBER, "too many blocks");
134 self.next_yield = block_number.saturating_sub(1);
138 } else if block_number < self.next_yield {
139 debug!(block_number, self.next_yield, "not advanced yet");
140 continue 'task;
141 }
142
143 let Some(client) = self.client.upgrade() else {
145 debug!("client dropped");
146 break 'task;
147 };
148
149 let mut retries = MAX_RETRIES;
152 for number in self.next_yield..=block_number {
153 debug!(number, "fetching block");
154 let block = match client.request("eth_getBlockByNumber", (U64::from(number), false)).await {
155 Ok(Some(block)) => block,
156 Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
157 debug!(number, %err, "failed to fetch block, retrying");
158 retries -= 1;
159 continue;
160 }
161 Ok(None) if retries > 0 => {
162 debug!(number, "failed to fetch block (doesn't exist), retrying");
163 retries -= 1;
164 continue;
165 }
166 Err(err) => {
167 error!(number, %err, "failed to fetch block");
168 break;
169 }
170 Ok(None) => {
171 error!(number, "failed to fetch block (doesn't exist)");
172 break;
173 }
174 };
175 self.known_blocks.put(number, block);
176 if self.known_blocks.len() == BLOCK_CACHE_SIZE.get() {
177 debug!(number, "cache full");
179 break;
180 }
181 }
182 }
183 }
184 }
185}
186
187#[cfg(all(test, feature = "anvil-api"))] mod tests {
189 use super::*;
190 use crate::{ext::AnvilApi, Provider, ProviderBuilder};
191 use alloy_node_bindings::Anvil;
192 use std::{future::Future, time::Duration};
193
194 async fn timeout<T: Future>(future: T) -> T::Output {
195 try_timeout(future).await.expect("Timeout")
196 }
197
198 async fn try_timeout<T: Future>(future: T) -> Option<T::Output> {
199 tokio::time::timeout(Duration::from_secs(2), future).await.ok()
200 }
201
202 #[tokio::test]
203 async fn yield_block_http() {
204 yield_block(false).await;
205 }
206 #[tokio::test]
207 #[cfg(feature = "ws")]
208 async fn yield_block_ws() {
209 yield_block(true).await;
210 }
211 async fn yield_block(ws: bool) {
212 let anvil = Anvil::new().spawn();
213
214 let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
215 let provider = ProviderBuilder::new().connect(&url).await.unwrap();
216
217 let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
218 let mut stream = Box::pin(new_blocks.into_stream());
219 if ws {
220 let _ = try_timeout(stream.next()).await; }
222
223 provider.anvil_mine(Some(1), None).await.unwrap();
225
226 let block = timeout(stream.next()).await.expect("Block wasn't fetched");
227 assert_eq!(block.header.number, 1);
228 }
229
230 #[tokio::test]
231 async fn yield_many_blocks_http() {
232 yield_many_blocks(false).await;
233 }
234 #[tokio::test]
235 #[cfg(feature = "ws")]
236 async fn yield_many_blocks_ws() {
237 yield_many_blocks(true).await;
238 }
239 async fn yield_many_blocks(ws: bool) {
240 const BLOCKS_TO_MINE: usize = BLOCK_CACHE_SIZE.get() + 1;
242
243 let anvil = Anvil::new().spawn();
244
245 let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
246 let provider = ProviderBuilder::new().connect(&url).await.unwrap();
247
248 let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
249 let mut stream = Box::pin(new_blocks.into_stream());
250 if ws {
251 let _ = try_timeout(stream.next()).await; }
253
254 provider.anvil_mine(Some(BLOCKS_TO_MINE as u64), None).await.unwrap();
256
257 let blocks = timeout(stream.take(BLOCKS_TO_MINE).collect::<Vec<_>>()).await;
258 assert_eq!(blocks.len(), BLOCKS_TO_MINE);
259 let first = blocks[0].header.number;
260 assert_eq!(first, 1);
261 for (i, block) in blocks.iter().enumerate() {
262 assert_eq!(block.header.number, first + i as u64);
263 }
264 }
265}