alloy_provider/
blocks.rs

1use alloy_network::{Ethereum, Network};
2use alloy_primitives::{BlockNumber, U64};
3use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient};
4use alloy_transport::RpcError;
5use async_stream::stream;
6use futures::{Stream, StreamExt};
7use lru::LruCache;
8use std::{marker::PhantomData, num::NonZeroUsize};
9
10#[cfg(feature = "pubsub")]
11use futures::{future::Either, FutureExt};
12
13/// The size of the block cache.
14const BLOCK_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(10).unwrap();
15
16/// Maximum number of retries for fetching a block.
17const MAX_RETRIES: usize = 3;
18
19/// Default block number for when we don't have a block yet.
20const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX;
21
22/// Streams new blocks from the client.
23pub(crate) struct NewBlocks<N: Network = Ethereum> {
24    client: WeakClient,
25    /// The next block to yield.
26    /// [`NO_BLOCK_NUMBER`] indicates that it will be updated on the first poll.
27    /// Only used by the polling task.
28    next_yield: BlockNumber,
29    /// LRU cache of known blocks. Only used by the polling task.
30    known_blocks: LruCache<BlockNumber, N::BlockResponse>,
31    _phantom: PhantomData<N>,
32}
33
34impl<N: Network> NewBlocks<N> {
35    pub(crate) fn new(client: WeakClient) -> Self {
36        Self {
37            client,
38            next_yield: NO_BLOCK_NUMBER,
39            known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
40            _phantom: PhantomData,
41        }
42    }
43
44    #[cfg(test)]
45    const fn with_next_yield(mut self, next_yield: u64) -> Self {
46        self.next_yield = next_yield;
47        self
48    }
49
50    pub(crate) fn into_stream(self) -> impl Stream<Item = N::BlockResponse> + 'static {
51        // Return a stream that lazily subscribes to `newHeads` on the first poll.
52        #[cfg(feature = "pubsub")]
53        if let Some(client) = self.client.upgrade() {
54            if client.pubsub_frontend().is_some() {
55                let subscriber = self.into_subscription_stream().map(futures::stream::iter);
56                let subscriber = futures::stream::once(subscriber);
57                return Either::Left(subscriber.flatten().flatten());
58            }
59        }
60
61        // Returns a stream that lazily initializes an `eth_blockNumber` polling task on the first
62        // poll, mapped with `eth_getBlockByNumber`.
63        #[cfg(feature = "pubsub")]
64        let right = Either::Right;
65        #[cfg(not(feature = "pubsub"))]
66        let right = std::convert::identity;
67        right(self.into_poll_stream())
68    }
69
70    #[cfg(feature = "pubsub")]
71    async fn into_subscription_stream(
72        self,
73    ) -> Option<impl Stream<Item = N::BlockResponse> + 'static> {
74        use alloy_consensus::BlockHeader;
75
76        let Some(client) = self.client.upgrade() else {
77            debug!("client dropped");
78            return None;
79        };
80        let Some(pubsub) = client.pubsub_frontend() else {
81            error!("pubsub_frontend returned None after being Some");
82            return None;
83        };
84        let id = match client.request("eth_subscribe", ("newHeads",)).await {
85            Ok(id) => id,
86            Err(err) => {
87                error!(%err, "failed to subscribe to newHeads");
88                return None;
89            }
90        };
91        let sub = match pubsub.get_subscription(id).await {
92            Ok(sub) => sub,
93            Err(err) => {
94                error!(%err, "failed to get subscription");
95                return None;
96            }
97        };
98        let stream =
99            sub.into_typed::<N::HeaderResponse>().into_stream().map(|header| header.number());
100        Some(self.into_block_stream(stream))
101    }
102
103    fn into_poll_stream(self) -> impl Stream<Item = N::BlockResponse> + 'static {
104        // Spawned lazily on the first `poll`.
105        let stream =
106            PollerBuilder::<NoParams, U64>::new(self.client.clone(), "eth_blockNumber", [])
107                .into_stream()
108                .map(|n| n.to());
109
110        self.into_block_stream(stream)
111    }
112
113    fn into_block_stream(
114        mut self,
115        mut numbers_stream: impl Stream<Item = u64> + Unpin + 'static,
116    ) -> impl Stream<Item = N::BlockResponse> + 'static {
117        stream! {
118        'task: loop {
119            // Clear any buffered blocks.
120            while let Some(known_block) = self.known_blocks.pop(&self.next_yield) {
121                debug!(number=self.next_yield, "yielding block");
122                self.next_yield += 1;
123                yield known_block;
124            }
125
126            // Get the tip.
127            let Some(block_number) = numbers_stream.next().await else {
128                debug!("polling stream ended");
129                break 'task;
130            };
131            trace!(%block_number, "got block number");
132            if self.next_yield == NO_BLOCK_NUMBER {
133                assert!(block_number < NO_BLOCK_NUMBER, "too many blocks");
134                // this stream can be initialized after the first tx was sent,
135                // to avoid the edge case where the tx is mined immediately, we should apply an
136                // offset to the initial fetch so that we fetch tip - 1
137                self.next_yield = block_number.saturating_sub(1);
138            } else if block_number < self.next_yield {
139                debug!(block_number, self.next_yield, "not advanced yet");
140                continue 'task;
141            }
142
143            // Upgrade the provider.
144            let Some(client) = self.client.upgrade() else {
145                debug!("client dropped");
146                break 'task;
147            };
148
149            // Then try to fill as many blocks as possible.
150            // TODO: Maybe use `join_all`
151            let mut retries = MAX_RETRIES;
152            for number in self.next_yield..=block_number {
153                debug!(number, "fetching block");
154                let block = match client.request("eth_getBlockByNumber", (U64::from(number), false)).await {
155                    Ok(Some(block)) => block,
156                    Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
157                        debug!(number, %err, "failed to fetch block, retrying");
158                        retries -= 1;
159                        continue;
160                    }
161                    Ok(None) if retries > 0 => {
162                        debug!(number, "failed to fetch block (doesn't exist), retrying");
163                        retries -= 1;
164                        continue;
165                    }
166                    Err(err) => {
167                        error!(number, %err, "failed to fetch block");
168                        break;
169                    }
170                    Ok(None) => {
171                        error!(number, "failed to fetch block (doesn't exist)");
172                        break;
173                    }
174                };
175                self.known_blocks.put(number, block);
176                if self.known_blocks.len() == BLOCK_CACHE_SIZE.get() {
177                    // Cache is full, should be consumed before filling more blocks.
178                    debug!(number, "cache full");
179                    break;
180                }
181            }
182        }
183        }
184    }
185}
186
187#[cfg(all(test, feature = "anvil-api"))] // Tests rely heavily on ability to mine blocks on demand.
188mod tests {
189    use super::*;
190    use crate::{ext::AnvilApi, Provider, ProviderBuilder};
191    use alloy_node_bindings::Anvil;
192    use std::{future::Future, time::Duration};
193
194    async fn timeout<T: Future>(future: T) -> T::Output {
195        try_timeout(future).await.expect("Timeout")
196    }
197
198    async fn try_timeout<T: Future>(future: T) -> Option<T::Output> {
199        tokio::time::timeout(Duration::from_secs(2), future).await.ok()
200    }
201
202    #[tokio::test]
203    async fn yield_block_http() {
204        yield_block(false).await;
205    }
206    #[tokio::test]
207    #[cfg(feature = "ws")]
208    async fn yield_block_ws() {
209        yield_block(true).await;
210    }
211    async fn yield_block(ws: bool) {
212        let anvil = Anvil::new().spawn();
213
214        let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
215        let provider = ProviderBuilder::new().connect(&url).await.unwrap();
216
217        let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
218        let mut stream = Box::pin(new_blocks.into_stream());
219        if ws {
220            let _ = try_timeout(stream.next()).await; // Subscribe to newHeads.
221        }
222
223        // We will also use provider to manipulate anvil instance via RPC.
224        provider.anvil_mine(Some(1), None).await.unwrap();
225
226        let block = timeout(stream.next()).await.expect("Block wasn't fetched");
227        assert_eq!(block.header.number, 1);
228    }
229
230    #[tokio::test]
231    async fn yield_many_blocks_http() {
232        yield_many_blocks(false).await;
233    }
234    #[tokio::test]
235    #[cfg(feature = "ws")]
236    async fn yield_many_blocks_ws() {
237        yield_many_blocks(true).await;
238    }
239    async fn yield_many_blocks(ws: bool) {
240        // Make sure that we can process more blocks than fits in the cache.
241        const BLOCKS_TO_MINE: usize = BLOCK_CACHE_SIZE.get() + 1;
242
243        let anvil = Anvil::new().spawn();
244
245        let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
246        let provider = ProviderBuilder::new().connect(&url).await.unwrap();
247
248        let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
249        let mut stream = Box::pin(new_blocks.into_stream());
250        if ws {
251            let _ = try_timeout(stream.next()).await; // Subscribe to newHeads.
252        }
253
254        // We will also use provider to manipulate anvil instance via RPC.
255        provider.anvil_mine(Some(BLOCKS_TO_MINE as u64), None).await.unwrap();
256
257        let blocks = timeout(stream.take(BLOCKS_TO_MINE).collect::<Vec<_>>()).await;
258        assert_eq!(blocks.len(), BLOCKS_TO_MINE);
259        let first = blocks[0].header.number;
260        assert_eq!(first, 1);
261        for (i, block) in blocks.iter().enumerate() {
262            assert_eq!(block.header.number, first + i as u64);
263        }
264    }
265}