1use alloy_network::{Ethereum, Network};
2use alloy_primitives::{BlockNumber, U64};
3use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient};
4use alloy_transport::RpcError;
5use async_stream::stream;
6use futures::{Stream, StreamExt};
7use lru::LruCache;
8use std::{
9 marker::PhantomData,
10 num::NonZeroUsize,
11 sync::{
12 atomic::{AtomicBool, Ordering},
13 Arc,
14 },
15};
16
17#[cfg(feature = "pubsub")]
18use futures::{future::Either, FutureExt};
19
20const BLOCK_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(10).unwrap();
22
23const MAX_RETRIES: usize = 3;
25
26const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX;
28
29#[derive(Default)]
30pub(crate) struct Paused {
31 is_paused: AtomicBool,
32 notify: tokio::sync::Notify,
33}
34
35impl Paused {
36 pub(crate) fn is_paused(&self) -> bool {
37 self.is_paused.load(Ordering::Acquire)
38 }
39
40 pub(crate) fn set_paused(&self, paused: bool) {
41 self.is_paused.store(paused, Ordering::Release);
42 if !paused {
43 self.notify.notify_waiters();
44 }
45 }
46
47 async fn wait(&self) -> bool {
52 if !self.is_paused() {
53 return false;
54 }
55 self.notify.notified().await;
56 debug_assert!(!self.is_paused());
57 true
58 }
59}
60
61pub(crate) struct NewBlocks<N: Network = Ethereum> {
63 client: WeakClient,
64 next_yield: BlockNumber,
68 known_blocks: LruCache<BlockNumber, N::BlockResponse>,
70 pub(crate) paused: Arc<Paused>,
71 _phantom: PhantomData<N>,
72}
73
74impl<N: Network> NewBlocks<N> {
75 pub(crate) fn new(client: WeakClient) -> Self {
76 Self {
77 client,
78 next_yield: NO_BLOCK_NUMBER,
79 known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
80 paused: Arc::default(),
81 _phantom: PhantomData,
82 }
83 }
84
85 #[cfg(test)]
86 const fn with_next_yield(mut self, next_yield: u64) -> Self {
87 self.next_yield = next_yield;
88 self
89 }
90
91 pub(crate) fn into_stream(self) -> impl Stream<Item = N::BlockResponse> + 'static {
92 #[cfg(feature = "pubsub")]
94 if let Some(client) = self.client.upgrade() {
95 if client.pubsub_frontend().is_some() {
96 let subscriber = self.into_subscription_stream().map(futures::stream::iter);
97 let subscriber = futures::stream::once(subscriber);
98 return Either::Left(subscriber.flatten().flatten());
99 }
100 }
101
102 #[cfg(feature = "pubsub")]
105 let right = Either::Right;
106 #[cfg(not(feature = "pubsub"))]
107 let right = std::convert::identity;
108 right(self.into_poll_stream())
109 }
110
111 #[cfg(feature = "pubsub")]
112 async fn into_subscription_stream(
113 self,
114 ) -> Option<impl Stream<Item = N::BlockResponse> + 'static> {
115 use alloy_consensus::BlockHeader;
116
117 let Some(client) = self.client.upgrade() else {
118 debug!("client dropped");
119 return None;
120 };
121 let Some(pubsub) = client.pubsub_frontend() else {
122 error!("pubsub_frontend returned None after being Some");
123 return None;
124 };
125 let id = match client.request("eth_subscribe", ("newHeads",)).await {
126 Ok(id) => id,
127 Err(err) => {
128 error!(%err, "failed to subscribe to newHeads");
129 return None;
130 }
131 };
132 let sub = match pubsub.get_subscription(id).await {
133 Ok(sub) => sub,
134 Err(err) => {
135 error!(%err, "failed to get subscription");
136 return None;
137 }
138 };
139 let stream =
140 sub.into_typed::<N::HeaderResponse>().into_stream().map(|header| header.number());
141 Some(self.into_block_stream(stream))
142 }
143
144 fn into_poll_stream(self) -> impl Stream<Item = N::BlockResponse> + 'static {
145 let stream =
147 PollerBuilder::<NoParams, U64>::new(self.client.clone(), "eth_blockNumber", [])
148 .into_stream()
149 .map(|n| n.to());
150
151 self.into_block_stream(stream)
152 }
153
154 fn into_block_stream(
155 mut self,
156 mut numbers_stream: impl Stream<Item = u64> + Unpin + 'static,
157 ) -> impl Stream<Item = N::BlockResponse> + 'static {
158 stream! {
159 'task: loop {
160 while let Some(known_block) = self.known_blocks.pop(&self.next_yield) {
162 debug!(number=self.next_yield, "yielding block");
163 self.next_yield += 1;
164 yield known_block;
165 }
166
167 let unpaused = self.paused.wait().await;
170
171 let Some(block_number) = numbers_stream.next().await else {
173 debug!("polling stream ended");
174 break 'task;
175 };
176 trace!(%block_number, "got block number");
177 if self.next_yield == NO_BLOCK_NUMBER || unpaused {
178 assert!(block_number < NO_BLOCK_NUMBER, "too many blocks");
179 self.next_yield = block_number.saturating_sub(1);
183 } else if block_number < self.next_yield {
184 debug!(block_number, self.next_yield, "not advanced yet");
185 continue 'task;
186 }
187
188 let Some(client) = self.client.upgrade() else {
190 debug!("client dropped");
191 break 'task;
192 };
193
194 for number in self.next_yield..=block_number {
197 debug!(number, "fetching block");
198 let mut retries = MAX_RETRIES;
199 let block = loop {
200 match client.request("eth_getBlockByNumber", (U64::from(number), false)).await {
201 Ok(Some(block)) => break Some(block),
202 Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
203 debug!(number, %err, "failed to fetch block, retrying");
204 retries -= 1;
205 }
206 Ok(None) if retries > 0 => {
207 debug!(number, "failed to fetch block (doesn't exist), retrying");
208 retries -= 1;
209 }
210 Err(err) => {
211 error!(number, %err, "failed to fetch block");
212 break None;
213 }
214 Ok(None) => {
215 error!(number, "failed to fetch block (doesn't exist)");
216 break None;
217 }
218 }
219 };
220 let Some(block) = block else {
221 break;
222 };
223 self.known_blocks.put(number, block);
224 if self.known_blocks.len() == BLOCK_CACHE_SIZE.get() {
225 debug!(number, "cache full");
227 break;
228 }
229 }
230 }
231 }
232 }
233}
234
235#[cfg(all(test, feature = "anvil-api"))] mod tests {
237 use super::*;
238 use crate::{ext::AnvilApi, Provider, ProviderBuilder};
239 use alloy_node_bindings::Anvil;
240 use std::{future::Future, time::Duration};
241
242 async fn timeout<T: Future>(future: T) -> T::Output {
243 try_timeout(future).await.expect("Timeout")
244 }
245
246 async fn try_timeout<T: Future>(future: T) -> Option<T::Output> {
247 tokio::time::timeout(Duration::from_secs(2), future).await.ok()
248 }
249
250 #[tokio::test]
251 async fn yield_block_http() {
252 yield_block(false).await;
253 }
254 #[tokio::test]
255 #[cfg(feature = "ws")]
256 async fn yield_block_ws() {
257 yield_block(true).await;
258 }
259 async fn yield_block(ws: bool) {
260 let anvil = Anvil::new().spawn();
261
262 let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
263 let provider = ProviderBuilder::new().connect(&url).await.unwrap();
264
265 let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
266 let mut stream = Box::pin(new_blocks.into_stream());
267 if ws {
268 let _ = try_timeout(stream.next()).await; }
270
271 provider.anvil_mine(Some(1), None).await.unwrap();
273
274 let block = timeout(stream.next()).await.expect("Block wasn't fetched");
275 assert_eq!(block.header.number, 1);
276 }
277
278 #[tokio::test]
279 async fn yield_many_blocks_http() {
280 yield_many_blocks(false).await;
281 }
282 #[tokio::test]
283 #[cfg(feature = "ws")]
284 async fn yield_many_blocks_ws() {
285 yield_many_blocks(true).await;
286 }
287 async fn yield_many_blocks(ws: bool) {
288 const BLOCKS_TO_MINE: usize = BLOCK_CACHE_SIZE.get() + 1;
290
291 let anvil = Anvil::new().spawn();
292
293 let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
294 let provider = ProviderBuilder::new().connect(&url).await.unwrap();
295
296 let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
297 let mut stream = Box::pin(new_blocks.into_stream());
298 if ws {
299 let _ = try_timeout(stream.next()).await; }
301
302 provider.anvil_mine(Some(BLOCKS_TO_MINE as u64), None).await.unwrap();
304
305 let blocks = timeout(stream.take(BLOCKS_TO_MINE).collect::<Vec<_>>()).await;
306 assert_eq!(blocks.len(), BLOCKS_TO_MINE);
307 let first = blocks[0].header.number;
308 assert_eq!(first, 1);
309 for (i, block) in blocks.iter().enumerate() {
310 assert_eq!(block.header.number, first + i as u64);
311 }
312 }
313}