alloy_provider/
blocks.rs

1use alloy_network::{Ethereum, Network};
2use alloy_primitives::{BlockNumber, U64};
3use alloy_rpc_client::{NoParams, PollerBuilder, WeakClient};
4use alloy_transport::RpcError;
5use async_stream::stream;
6use futures::{Stream, StreamExt};
7use lru::LruCache;
8use std::{
9    marker::PhantomData,
10    num::NonZeroUsize,
11    sync::{
12        atomic::{AtomicBool, Ordering},
13        Arc,
14    },
15};
16
17#[cfg(feature = "pubsub")]
18use futures::{future::Either, FutureExt};
19
20/// The size of the block cache.
21const BLOCK_CACHE_SIZE: NonZeroUsize = NonZeroUsize::new(10).unwrap();
22
23/// Maximum number of retries for fetching a block.
24const MAX_RETRIES: usize = 3;
25
26/// Default block number for when we don't have a block yet.
27const NO_BLOCK_NUMBER: BlockNumber = BlockNumber::MAX;
28
29#[derive(Default)]
30pub(crate) struct Paused {
31    is_paused: AtomicBool,
32    notify: tokio::sync::Notify,
33}
34
35impl Paused {
36    pub(crate) fn is_paused(&self) -> bool {
37        self.is_paused.load(Ordering::Acquire)
38    }
39
40    pub(crate) fn set_paused(&self, paused: bool) {
41        self.is_paused.store(paused, Ordering::Release);
42        if !paused {
43            self.notify.notify_waiters();
44        }
45    }
46
47    /// Waits until the paused state is changed to `false`.
48    ///
49    /// Returns `true` if the method actually waited for the paused state to become unpaused,
50    /// or `false` if it was already unpaused when called.
51    async fn wait(&self) -> bool {
52        if !self.is_paused() {
53            return false;
54        }
55        self.notify.notified().await;
56        debug_assert!(!self.is_paused());
57        true
58    }
59}
60
61/// Streams new blocks from the client.
62pub(crate) struct NewBlocks<N: Network = Ethereum> {
63    client: WeakClient,
64    /// The next block to yield.
65    /// [`NO_BLOCK_NUMBER`] indicates that it will be updated on the first poll.
66    /// Only used by the polling task.
67    next_yield: BlockNumber,
68    /// LRU cache of known blocks. Only used by the polling task.
69    known_blocks: LruCache<BlockNumber, N::BlockResponse>,
70    pub(crate) paused: Arc<Paused>,
71    _phantom: PhantomData<N>,
72}
73
74impl<N: Network> NewBlocks<N> {
75    pub(crate) fn new(client: WeakClient) -> Self {
76        Self {
77            client,
78            next_yield: NO_BLOCK_NUMBER,
79            known_blocks: LruCache::new(BLOCK_CACHE_SIZE),
80            paused: Arc::default(),
81            _phantom: PhantomData,
82        }
83    }
84
85    #[cfg(test)]
86    const fn with_next_yield(mut self, next_yield: u64) -> Self {
87        self.next_yield = next_yield;
88        self
89    }
90
91    pub(crate) fn into_stream(self) -> impl Stream<Item = N::BlockResponse> + 'static {
92        // Return a stream that lazily subscribes to `newHeads` on the first poll.
93        #[cfg(feature = "pubsub")]
94        if let Some(client) = self.client.upgrade() {
95            if client.pubsub_frontend().is_some() {
96                let subscriber = self.into_subscription_stream().map(futures::stream::iter);
97                let subscriber = futures::stream::once(subscriber);
98                return Either::Left(subscriber.flatten().flatten());
99            }
100        }
101
102        // Returns a stream that lazily initializes an `eth_blockNumber` polling task on the first
103        // poll, mapped with `eth_getBlockByNumber`.
104        #[cfg(feature = "pubsub")]
105        let right = Either::Right;
106        #[cfg(not(feature = "pubsub"))]
107        let right = std::convert::identity;
108        right(self.into_poll_stream())
109    }
110
111    #[cfg(feature = "pubsub")]
112    async fn into_subscription_stream(
113        self,
114    ) -> Option<impl Stream<Item = N::BlockResponse> + 'static> {
115        use alloy_consensus::BlockHeader;
116
117        let Some(client) = self.client.upgrade() else {
118            debug!("client dropped");
119            return None;
120        };
121        let Some(pubsub) = client.pubsub_frontend() else {
122            error!("pubsub_frontend returned None after being Some");
123            return None;
124        };
125        let id = match client.request("eth_subscribe", ("newHeads",)).await {
126            Ok(id) => id,
127            Err(err) => {
128                error!(%err, "failed to subscribe to newHeads");
129                return None;
130            }
131        };
132        let sub = match pubsub.get_subscription(id).await {
133            Ok(sub) => sub,
134            Err(err) => {
135                error!(%err, "failed to get subscription");
136                return None;
137            }
138        };
139        let stream =
140            sub.into_typed::<N::HeaderResponse>().into_stream().map(|header| header.number());
141        Some(self.into_block_stream(stream))
142    }
143
144    fn into_poll_stream(self) -> impl Stream<Item = N::BlockResponse> + 'static {
145        // Spawned lazily on the first `poll`.
146        let stream =
147            PollerBuilder::<NoParams, U64>::new(self.client.clone(), "eth_blockNumber", [])
148                .into_stream()
149                .map(|n| n.to());
150
151        self.into_block_stream(stream)
152    }
153
154    fn into_block_stream(
155        mut self,
156        mut numbers_stream: impl Stream<Item = u64> + Unpin + 'static,
157    ) -> impl Stream<Item = N::BlockResponse> + 'static {
158        stream! {
159        'task: loop {
160            // Clear any buffered blocks.
161            while let Some(known_block) = self.known_blocks.pop(&self.next_yield) {
162                debug!(number=self.next_yield, "yielding block");
163                self.next_yield += 1;
164                yield known_block;
165            }
166
167            // If we're paused, wait until we're unpaused.
168            // Once unpaused, reset `self.next_yield` to ignore the blocks that were included while we were paused.
169            let unpaused = self.paused.wait().await;
170
171            // Get the tip.
172            let Some(block_number) = numbers_stream.next().await else {
173                debug!("polling stream ended");
174                break 'task;
175            };
176            trace!(%block_number, "got block number");
177            if self.next_yield == NO_BLOCK_NUMBER || unpaused {
178                assert!(block_number < NO_BLOCK_NUMBER, "too many blocks");
179                // this stream can be initialized after the first tx was sent,
180                // to avoid the edge case where the tx is mined immediately, we should apply an
181                // offset to the initial fetch so that we fetch tip - 1
182                self.next_yield = block_number.saturating_sub(1);
183            } else if block_number < self.next_yield {
184                debug!(block_number, self.next_yield, "not advanced yet");
185                continue 'task;
186            }
187
188            // Upgrade the provider.
189            let Some(client) = self.client.upgrade() else {
190                debug!("client dropped");
191                break 'task;
192            };
193
194            // Then try to fill as many blocks as possible.
195            // TODO: Maybe use `join_all`
196            let mut retries = MAX_RETRIES;
197            for number in self.next_yield..=block_number {
198                debug!(number, "fetching block");
199                let block = match client.request("eth_getBlockByNumber", (U64::from(number), false)).await {
200                    Ok(Some(block)) => block,
201                    Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
202                        debug!(number, %err, "failed to fetch block, retrying");
203                        retries -= 1;
204                        continue;
205                    }
206                    Ok(None) if retries > 0 => {
207                        debug!(number, "failed to fetch block (doesn't exist), retrying");
208                        retries -= 1;
209                        continue;
210                    }
211                    Err(err) => {
212                        error!(number, %err, "failed to fetch block");
213                        break;
214                    }
215                    Ok(None) => {
216                        error!(number, "failed to fetch block (doesn't exist)");
217                        break;
218                    }
219                };
220                self.known_blocks.put(number, block);
221                if self.known_blocks.len() == BLOCK_CACHE_SIZE.get() {
222                    // Cache is full, should be consumed before filling more blocks.
223                    debug!(number, "cache full");
224                    break;
225                }
226            }
227        }
228        }
229    }
230}
231
232#[cfg(all(test, feature = "anvil-api"))] // Tests rely heavily on ability to mine blocks on demand.
233mod tests {
234    use super::*;
235    use crate::{ext::AnvilApi, Provider, ProviderBuilder};
236    use alloy_node_bindings::Anvil;
237    use std::{future::Future, time::Duration};
238
239    async fn timeout<T: Future>(future: T) -> T::Output {
240        try_timeout(future).await.expect("Timeout")
241    }
242
243    async fn try_timeout<T: Future>(future: T) -> Option<T::Output> {
244        tokio::time::timeout(Duration::from_secs(2), future).await.ok()
245    }
246
247    #[tokio::test]
248    async fn yield_block_http() {
249        yield_block(false).await;
250    }
251    #[tokio::test]
252    #[cfg(feature = "ws")]
253    async fn yield_block_ws() {
254        yield_block(true).await;
255    }
256    async fn yield_block(ws: bool) {
257        let anvil = Anvil::new().spawn();
258
259        let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
260        let provider = ProviderBuilder::new().connect(&url).await.unwrap();
261
262        let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
263        let mut stream = Box::pin(new_blocks.into_stream());
264        if ws {
265            let _ = try_timeout(stream.next()).await; // Subscribe to newHeads.
266        }
267
268        // We will also use provider to manipulate anvil instance via RPC.
269        provider.anvil_mine(Some(1), None).await.unwrap();
270
271        let block = timeout(stream.next()).await.expect("Block wasn't fetched");
272        assert_eq!(block.header.number, 1);
273    }
274
275    #[tokio::test]
276    async fn yield_many_blocks_http() {
277        yield_many_blocks(false).await;
278    }
279    #[tokio::test]
280    #[cfg(feature = "ws")]
281    async fn yield_many_blocks_ws() {
282        yield_many_blocks(true).await;
283    }
284    async fn yield_many_blocks(ws: bool) {
285        // Make sure that we can process more blocks than fits in the cache.
286        const BLOCKS_TO_MINE: usize = BLOCK_CACHE_SIZE.get() + 1;
287
288        let anvil = Anvil::new().spawn();
289
290        let url = if ws { anvil.ws_endpoint() } else { anvil.endpoint() };
291        let provider = ProviderBuilder::new().connect(&url).await.unwrap();
292
293        let new_blocks = NewBlocks::<Ethereum>::new(provider.weak_client()).with_next_yield(1);
294        let mut stream = Box::pin(new_blocks.into_stream());
295        if ws {
296            let _ = try_timeout(stream.next()).await; // Subscribe to newHeads.
297        }
298
299        // We will also use provider to manipulate anvil instance via RPC.
300        provider.anvil_mine(Some(BLOCKS_TO_MINE as u64), None).await.unwrap();
301
302        let blocks = timeout(stream.take(BLOCKS_TO_MINE).collect::<Vec<_>>()).await;
303        assert_eq!(blocks.len(), BLOCKS_TO_MINE);
304        let first = blocks[0].header.number;
305        assert_eq!(first, 1);
306        for (i, block) in blocks.iter().enumerate() {
307            assert_eq!(block.header.number, first + i as u64);
308        }
309    }
310}