Skip to main content

alloy_provider/provider/
watch_blocks_from.rs

1use super::WatchCanonicalBlocksFrom;
2use alloy_eips::BlockNumberOrTag;
3use alloy_json_rpc::{RpcError, RpcRecv};
4use alloy_network::{BlockResponse, Network};
5use alloy_network_primitives::{BlockTransactionsKind, HeaderResponse};
6use alloy_primitives::U64;
7use alloy_rpc_client::{RpcCall, RpcClientInner, WeakClient};
8use alloy_transport::{TransportError, TransportResult};
9use futures::{ready, Stream};
10use pin_project::pin_project;
11use std::{
12    future::Future,
13    marker::PhantomData,
14    pin::Pin,
15    sync::Arc,
16    task::{Context, Poll},
17    time::Duration,
18};
19
20#[cfg(all(target_family = "wasm", target_os = "unknown"))]
21use wasmtimer::{
22    std::Instant,
23    tokio::{interval_at, Interval},
24};
25
26#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
27use tokio::time::{interval_at, Instant, Interval};
28
29pub(super) const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(1);
30
31#[derive(Debug)]
32pub(super) struct PollIntervalDelay {
33    timer: Option<Interval>,
34}
35
36impl PollIntervalDelay {
37    pub(super) fn new(poll_interval: Duration) -> Self {
38        if poll_interval.is_zero() {
39            return Self { timer: None };
40        }
41        Self { timer: Some(interval_at(Instant::now() + poll_interval, poll_interval)) }
42    }
43
44    pub(super) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
45        if let Some(timer) = &mut self.timer {
46            ready!(timer.poll_tick(cx));
47        }
48        Poll::Ready(())
49    }
50}
51
52/// Future that fetches one block by number.
53///
54/// This is the item yielded by [`WatchBlocksFromStream`], and is also reused by
55/// [`WatchLogsFrom`](super::WatchLogsFrom) to fetch the block paired with a log batch. The future
56/// requests `eth_getBlockByNumber` with either transaction hashes or full transaction bodies,
57/// depending on [`BlockTransactionsKind`].
58///
59/// A `null` block response means the requested height is not available yet. In that case the future
60/// sleeps for the configured poll interval and retries the same height. Other transport/RPC errors
61/// are returned to the caller, so retry policy should be configured on the provider transport.
62#[pin_project]
63#[derive(Debug)]
64pub struct BlockFut<T>
65where
66    T: BlockResponse + RpcRecv,
67{
68    client: Option<Arc<RpcClientInner>>,
69    block_number: u64,
70    kind: BlockTransactionsKind,
71    poll_interval: Duration,
72    #[pin]
73    state: BlockFutState<T>,
74}
75
76#[pin_project(project = BlockFutStateProj)]
77#[derive(Debug)]
78enum BlockFutState<T>
79where
80    T: BlockResponse + RpcRecv,
81{
82    /// Polling an `eth_getBlockByNumber` request for the target height.
83    Request {
84        #[pin]
85        call: RpcCall<(BlockNumberOrTag, bool), Option<T>>,
86    },
87    /// Waiting before retrying the same height after the node returned `null`.
88    Sleeping { delay: PollIntervalDelay },
89    /// Returning a prebuilt result, used when the parent stream cannot create a normal request.
90    Ready { result: Option<TransportResult<T>> },
91    /// Future has completed and must not be polled again.
92    Complete,
93}
94
95impl<T> BlockFut<T>
96where
97    T: BlockResponse + RpcRecv,
98{
99    pub(super) fn new(
100        client: Arc<RpcClientInner>,
101        block_number: u64,
102        kind: BlockTransactionsKind,
103        poll_interval: Duration,
104    ) -> Self {
105        let call = Self::block_request_call(&client, block_number, kind);
106        Self {
107            client: Some(client),
108            block_number,
109            kind,
110            poll_interval,
111            state: BlockFutState::Request { call },
112        }
113    }
114
115    pub(super) const fn err(err: TransportError) -> Self {
116        Self {
117            client: None,
118            block_number: 0,
119            kind: BlockTransactionsKind::Hashes,
120            poll_interval: Duration::from_secs(0),
121            state: BlockFutState::Ready { result: Some(Err(err)) },
122        }
123    }
124
125    fn block_request_call(
126        client: &Arc<RpcClientInner>,
127        block_number: u64,
128        kind: BlockTransactionsKind,
129    ) -> RpcCall<(BlockNumberOrTag, bool), Option<T>> {
130        client
131            .request("eth_getBlockByNumber", (BlockNumberOrTag::from(block_number), kind.is_full()))
132    }
133}
134
135impl<T> Future for BlockFut<T>
136where
137    T: BlockResponse + RpcRecv,
138{
139    type Output = TransportResult<T>;
140
141    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
142        let mut this = self.project();
143
144        loop {
145            match this.state.as_mut().project() {
146                BlockFutStateProj::Request { call } => match ready!(call.poll(cx)) {
147                    Ok(Some(mut block)) => {
148                        if this.kind.is_hashes() && block.transactions().is_empty() {
149                            block.transactions_mut().convert_to_hashes();
150                        }
151                        this.state.set(BlockFutState::Complete);
152                        return Poll::Ready(Ok(block));
153                    }
154                    Ok(None) => {
155                        this.state.set(BlockFutState::Sleeping {
156                            delay: PollIntervalDelay::new(*this.poll_interval),
157                        });
158                    }
159                    Err(err) => {
160                        this.state.set(BlockFutState::Complete);
161                        return Poll::Ready(Err(err));
162                    }
163                },
164                BlockFutStateProj::Sleeping { delay } => {
165                    ready!(delay.poll(cx));
166                    let Some(client) = this.client.as_ref() else {
167                        this.state.set(BlockFutState::Complete);
168                        return Poll::Ready(Err(TransportError::local_usage_str(
169                            "provider was dropped",
170                        )));
171                    };
172                    this.state.set(BlockFutState::Request {
173                        call: Self::block_request_call(client, *this.block_number, *this.kind),
174                    });
175                }
176                BlockFutStateProj::Ready { result } => {
177                    let result = result.take().expect("polled BlockFut after completion");
178                    this.state.set(BlockFutState::Complete);
179                    return Poll::Ready(result);
180                }
181                BlockFutStateProj::Complete => panic!("polled BlockFut after completion"),
182            }
183        }
184    }
185}
186
187/// Future that resolves a block tag into a numeric head height.
188///
189/// `WatchBlocksFromStream` and `WatchLogsFromStream` use this before yielding per-height futures so
190/// they only schedule work through the configured head tag. Numeric tags and `earliest` resolve
191/// immediately. `latest` uses `eth_blockNumber`; other tags use `eth_getBlockByNumber` and return
192/// the tagged block's header number.
193#[pin_project]
194#[derive(Debug)]
195pub(super) struct FetchHeadFut<HeaderResp>
196where
197    HeaderResp: HeaderResponse + RpcRecv,
198{
199    #[pin]
200    state: FetchHeadFutState<HeaderResp>,
201}
202
203#[pin_project(project = FetchHeadFutStateProj)]
204#[derive(Debug)]
205enum FetchHeadFutState<HeaderResp>
206where
207    HeaderResp: HeaderResponse + RpcRecv,
208{
209    /// Polling `eth_blockNumber` for a `latest` head.
210    Latest {
211        #[pin]
212        call: RpcCall<[(); 0], U64>,
213    },
214    /// Polling `eth_getBlockByNumber` for a non-latest tag such as finalized or safe.
215    Tagged {
216        #[pin]
217        call: RpcCall<(BlockNumberOrTag, bool), Option<HeaderResp>>,
218    },
219    /// Returning an immediately known height, such as a numeric tag or `earliest`.
220    Ready { result: Option<TransportResult<u64>> },
221    /// Future has completed and must not be polled again.
222    Complete,
223}
224
225impl<HeaderResp> FetchHeadFut<HeaderResp>
226where
227    HeaderResp: HeaderResponse + RpcRecv,
228{
229    pub(super) fn new(client: Arc<RpcClientInner>, tag: BlockNumberOrTag) -> Self {
230        let state = match tag {
231            BlockNumberOrTag::Number(number) => {
232                FetchHeadFutState::Ready { result: Some(Ok(number)) }
233            }
234            BlockNumberOrTag::Earliest => FetchHeadFutState::Ready { result: Some(Ok(0)) },
235            BlockNumberOrTag::Latest => {
236                FetchHeadFutState::Latest { call: client.request_noparams("eth_blockNumber") }
237            }
238            _ => FetchHeadFutState::Tagged {
239                call: client.request("eth_getBlockByNumber", (tag, false)),
240            },
241        };
242        Self { state }
243    }
244}
245
246impl<HeaderResp> Future for FetchHeadFut<HeaderResp>
247where
248    HeaderResp: HeaderResponse + RpcRecv,
249{
250    type Output = TransportResult<u64>;
251
252    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253        let mut this = self.project();
254
255        match this.state.as_mut().project() {
256            FetchHeadFutStateProj::Latest { call } => {
257                let result = ready!(call.poll(cx)).map(|n| n.to());
258                this.state.set(FetchHeadFutState::Complete);
259                Poll::Ready(result)
260            }
261            FetchHeadFutStateProj::Tagged { call } => {
262                let result = match ready!(call.poll(cx)) {
263                    Ok(resp) => resp.map(|header| header.number()).ok_or(RpcError::NullResp),
264                    Err(err) => Err(err),
265                };
266                this.state.set(FetchHeadFutState::Complete);
267                Poll::Ready(result)
268            }
269            FetchHeadFutStateProj::Ready { result } => {
270                let result = result.take().expect("polled FetchHeadFut after completion");
271                this.state.set(FetchHeadFutState::Complete);
272                Poll::Ready(result)
273            }
274            FetchHeadFutStateProj::Complete => panic!("polled FetchHeadFut after completion"),
275        }
276    }
277}
278
279/// A builder for streaming blocks from a historical block and continuing indefinitely.
280#[derive(Debug, Clone)]
281#[must_use = "this builder does nothing unless you call `.into_stream`"]
282pub struct WatchBlocksFrom<N: Network> {
283    client: WeakClient,
284    start_block: u64,
285    poll_interval: Duration,
286    block_tag: BlockNumberOrTag,
287    kind: BlockTransactionsKind,
288    _phantom: PhantomData<fn() -> N>,
289}
290
291impl<N: Network> WatchBlocksFrom<N> {
292    /// Creates a new [`WatchBlocksFrom`] builder.
293    pub(crate) const fn new(client: WeakClient, start_block: u64) -> Self {
294        Self {
295            client,
296            start_block,
297            poll_interval: DEFAULT_POLL_INTERVAL,
298            block_tag: BlockNumberOrTag::Latest,
299            kind: BlockTransactionsKind::Hashes,
300            _phantom: PhantomData,
301        }
302    }
303
304    /// Streams blocks with full transaction bodies.
305    pub const fn full(mut self) -> Self {
306        self.kind = BlockTransactionsKind::Full;
307        self
308    }
309
310    /// Streams blocks with transaction hashes only.
311    pub const fn hashes(mut self) -> Self {
312        self.kind = BlockTransactionsKind::Hashes;
313        self
314    }
315
316    /// Sets the poll interval used when the stream is caught up.
317    pub const fn poll_interval(mut self, poll_interval: Duration) -> Self {
318        self.poll_interval = poll_interval;
319        self
320    }
321
322    /// Sets the head block tag used to determine stream progress.
323    pub const fn block_tag(mut self, block_tag: BlockNumberOrTag) -> Self {
324        self.block_tag = block_tag;
325        self
326    }
327
328    /// Converts this builder into a canonical-stream builder that emits
329    /// [`crate::CanonicalEvent`] deltas on reorgs.
330    pub const fn canonical(self) -> WatchCanonicalBlocksFrom<N> {
331        WatchCanonicalBlocksFrom::new(self)
332    }
333
334    /// Creates a future that fetches a single block by number.
335    pub(super) fn get_block(&self, block_number: u64) -> BlockFut<N::BlockResponse> {
336        self.client
337            .upgrade()
338            .map(|client| BlockFut::new(client, block_number, self.kind, self.poll_interval))
339            .unwrap_or_else(|| BlockFut::err(RpcError::local_usage_str("provider was dropped")))
340    }
341
342    /// Stream blocks from a historical block using sequential `eth_getBlockByNumber` calls.
343    ///
344    /// This stream continues polling after catching up and continues yielding new blocks
345    /// indefinitely.
346    ///
347    /// This stream _does not_ handle reorgs. Instead, each item yielded from the stream
348    /// is strictly ordered in terms of block number, regardless of the blocks parent.
349    ///
350    /// For example (height, hash, parent):
351    ///
352    /// You should expect blocks in order by number with no gaps and with disjoint parents:
353    /// [(1, 1A, 0A),(2, 2A, 1A),(3,3B,2B)]
354    ///
355    /// And you should not expect receiving two blocks with the same number:
356    /// [(1, 1A, 0A),(2, 2A, 1A),(2,2B,1A)]
357    ///
358    /// Each yielded future contains one block request.
359    ///
360    /// If a block request returns `NullResp`, the yielded future retries the same block until it
361    /// succeeds.
362    ///
363    /// Other errors are surfaced to the caller. Configure retries on the underlying client
364    /// transport (for example with `RetryBackoffLayer`) for transport-level retry behavior.
365    ///
366    /// This can be buffered by the caller, for example with
367    /// [`StreamExt::buffered`](futures::StreamExt::buffered).
368    pub const fn into_stream(self) -> WatchBlocksFromStream<N> {
369        let current_block = self.start_block;
370        WatchBlocksFromStream {
371            inner: self,
372            current_block,
373            head: 0,
374            state: WatchBlocksFromState::FetchHead,
375        }
376    }
377}
378
379/// A stream of block-fetching futures produced by [`WatchBlocksFrom`].
380///
381/// Each item is a [`BlockFut`] that, when awaited, fetches one block via
382/// `eth_getBlockByNumber`. Callers typically apply
383/// [`StreamExt::buffered`](futures::StreamExt::buffered) to resolve
384/// multiple block requests concurrently.
385#[derive(Debug)]
386pub struct WatchBlocksFromStream<N: Network> {
387    inner: WatchBlocksFrom<N>,
388    current_block: u64,
389    head: u64,
390    state: WatchBlocksFromState<N>,
391}
392
393#[derive(Debug)]
394enum WatchBlocksFromState<N: Network> {
395    /// Upgrade the client and begin fetching head.
396    FetchHead,
397    /// Polling the in-flight head-block-number future.
398    FetchingHead { fut: FetchHeadFut<N::HeaderResponse> },
399    /// Yielding block futures for `current_block..=head`.
400    Yielding,
401    /// Sleeping between poll cycles.
402    Sleeping { delay: PollIntervalDelay },
403    /// Stream terminated.
404    Done,
405}
406
407impl<N: Network> Stream for WatchBlocksFromStream<N> {
408    type Item = BlockFut<N::BlockResponse>;
409
410    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
411        let this = self.get_mut();
412
413        loop {
414            match &mut this.state {
415                WatchBlocksFromState::FetchHead => {
416                    let Some(client) = this.inner.client.upgrade() else {
417                        this.state = WatchBlocksFromState::Done;
418                        continue;
419                    };
420                    let fut = FetchHeadFut::new(client, this.inner.block_tag);
421                    this.state = WatchBlocksFromState::FetchingHead { fut };
422                }
423                WatchBlocksFromState::FetchingHead { fut } => {
424                    match ready!(Pin::new(fut).poll(cx)) {
425                        Ok(head) => {
426                            this.head = head;
427                            if this.current_block > head {
428                                this.state = WatchBlocksFromState::Sleeping {
429                                    delay: PollIntervalDelay::new(this.inner.poll_interval),
430                                };
431                            } else {
432                                this.state = WatchBlocksFromState::Yielding;
433                            }
434                        }
435                        Err(err) => {
436                            this.state = WatchBlocksFromState::Sleeping {
437                                delay: PollIntervalDelay::new(this.inner.poll_interval),
438                            };
439                            return Poll::Ready(Some(BlockFut::err(err)));
440                        }
441                    }
442                }
443                WatchBlocksFromState::Yielding => {
444                    if this.current_block > this.head {
445                        this.state = WatchBlocksFromState::Sleeping {
446                            delay: PollIntervalDelay::new(this.inner.poll_interval),
447                        };
448                        continue;
449                    }
450
451                    let next_block = this.current_block.saturating_add(1);
452                    if next_block <= this.current_block {
453                        let err = RpcError::local_usage_str(
454                            "watch stream step did not advance block cursor",
455                        );
456                        this.state = WatchBlocksFromState::Sleeping {
457                            delay: PollIntervalDelay::new(this.inner.poll_interval),
458                        };
459                        return Poll::Ready(Some(BlockFut::err(err)));
460                    }
461
462                    let Some(client) = this.inner.client.upgrade() else {
463                        this.state = WatchBlocksFromState::Done;
464                        continue;
465                    };
466
467                    let item_fut: BlockFut<N::BlockResponse> = BlockFut::new(
468                        client,
469                        this.current_block,
470                        this.inner.kind,
471                        this.inner.poll_interval,
472                    );
473                    this.current_block = next_block;
474                    return Poll::Ready(Some(item_fut));
475                }
476                WatchBlocksFromState::Sleeping { delay } => {
477                    ready!(delay.poll(cx));
478                    this.state = WatchBlocksFromState::FetchHead;
479                }
480                WatchBlocksFromState::Done => return Poll::Ready(None),
481            }
482        }
483    }
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489    use crate::{Provider, ProviderBuilder};
490    use alloy_rpc_client::RpcClient;
491    use alloy_rpc_types_eth::Block;
492    use alloy_transport::{
493        layers::{RetryBackoffLayer, RetryPolicy},
494        mock::MockTransport,
495    };
496    use futures::StreamExt;
497    use tokio::time::timeout;
498
499    fn block(number: u64) -> Block {
500        let mut block: Block = Block::default();
501        block.header.inner.number = number;
502        block
503    }
504
505    #[tokio::test]
506    async fn streams_blocks_from_start_block() {
507        let asserter = alloy_transport::mock::Asserter::new();
508        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
509
510        asserter.push_success(&3_u64);
511        asserter.push_success(&Some(block(1)));
512        asserter.push_success(&Some(block(2)));
513        asserter.push_success(&Some(block(3)));
514
515        let mut stream = provider
516            .watch_blocks_from(1)
517            .block_tag(BlockNumberOrTag::Latest)
518            .poll_interval(Duration::from_millis(1))
519            .into_stream()
520            .buffered(1);
521
522        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
523        assert_eq!(first.header.number, 1);
524
525        let second =
526            timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
527        assert_eq!(second.header.number, 2);
528
529        let third = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
530        assert_eq!(third.header.number, 3);
531    }
532
533    #[tokio::test]
534    async fn advances_to_next_block_after_error() {
535        let asserter = alloy_transport::mock::Asserter::new();
536        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
537
538        asserter.push_success(&1_u64);
539        asserter.push_failure_msg("boom");
540        asserter.push_success(&2_u64);
541        asserter.push_success(&Some(block(2)));
542
543        let mut stream = provider
544            .watch_blocks_from(1)
545            .block_tag(BlockNumberOrTag::Latest)
546            .poll_interval(Duration::from_millis(1))
547            .into_stream()
548            .buffered(1);
549
550        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
551        assert!(first.is_err());
552
553        let second =
554            timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
555        assert_eq!(second.header.number, 2);
556    }
557
558    #[tokio::test]
559    async fn retries_same_block_after_null_response() {
560        let asserter = alloy_transport::mock::Asserter::new();
561        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
562
563        let no_block: Option<Block> = None;
564        asserter.push_success(&1_u64);
565        asserter.push_success(&no_block);
566        asserter.push_success(&Some(block(1)));
567        asserter.push_success(&2_u64);
568        asserter.push_success(&Some(block(2)));
569
570        let mut stream = provider
571            .watch_blocks_from(1)
572            .block_tag(BlockNumberOrTag::Latest)
573            .poll_interval(Duration::from_millis(1))
574            .into_stream()
575            .buffered(1);
576
577        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
578        assert_eq!(first.header.number, 1);
579
580        let second =
581            timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
582        assert_eq!(second.header.number, 2);
583    }
584
585    #[tokio::test]
586    async fn recovers_after_head_fetch_error() {
587        let asserter = alloy_transport::mock::Asserter::new();
588        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
589
590        asserter.push_failure_msg("head boom");
591        asserter.push_success(&1_u64);
592        asserter.push_success(&Some(block(1)));
593
594        let mut stream = provider
595            .watch_blocks_from(1)
596            .block_tag(BlockNumberOrTag::Latest)
597            .poll_interval(Duration::from_millis(1))
598            .into_stream()
599            .buffered(1);
600
601        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
602        assert!(first.is_err());
603
604        let second =
605            timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
606        assert_eq!(second.header.number, 1);
607    }
608
609    #[tokio::test]
610    async fn uses_provider_retry_layer() {
611        #[derive(Clone, Debug)]
612        struct AlwaysRetryPolicy;
613
614        impl RetryPolicy for AlwaysRetryPolicy {
615            fn should_retry(&self, _error: &alloy_transport::TransportError) -> bool {
616                true
617            }
618
619            fn backoff_hint(&self, _error: &alloy_transport::TransportError) -> Option<Duration> {
620                None
621            }
622        }
623
624        let asserter = alloy_transport::mock::Asserter::new();
625        let retry_layer = RetryBackoffLayer::new_with_policy(3, 0, 10_000, AlwaysRetryPolicy);
626        let client = RpcClient::builder()
627            .layer(retry_layer)
628            .transport(MockTransport::new(asserter.clone()), true);
629        let provider = ProviderBuilder::new().connect_client(client);
630
631        asserter.push_failure_msg("temporary head error");
632        asserter.push_success(&1_u64);
633        asserter.push_failure_msg("temporary block error");
634        asserter.push_success(&Some(block(1)));
635
636        let mut stream = provider
637            .watch_blocks_from(1)
638            .block_tag(BlockNumberOrTag::Latest)
639            .poll_interval(Duration::from_millis(1))
640            .into_stream()
641            .buffered(1);
642
643        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
644        assert_eq!(first.header.number, 1);
645    }
646
647    #[tokio::test]
648    async fn waits_until_head_reaches_start_block() {
649        let asserter = alloy_transport::mock::Asserter::new();
650        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
651
652        asserter.push_success(&0_u64);
653        asserter.push_success(&1_u64);
654        asserter.push_success(&Some(block(1)));
655
656        let mut stream = provider
657            .watch_blocks_from(1)
658            .block_tag(BlockNumberOrTag::Latest)
659            .poll_interval(Duration::from_millis(1))
660            .into_stream()
661            .buffered(1);
662
663        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
664        assert_eq!(first.header.number, 1);
665    }
666
667    #[tokio::test]
668    async fn fixed_block_tag_number_does_not_fetch_head() {
669        let asserter = alloy_transport::mock::Asserter::new();
670        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
671
672        asserter.push_success(&Some(block(5)));
673
674        let mut stream = provider
675            .watch_blocks_from(5)
676            .block_tag(BlockNumberOrTag::Number(5))
677            .poll_interval(Duration::from_millis(1))
678            .into_stream()
679            .buffered(1);
680
681        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
682        assert_eq!(first.header.number, 5);
683    }
684
685    #[tokio::test]
686    async fn earliest_block_tag_starts_at_zero() {
687        let asserter = alloy_transport::mock::Asserter::new();
688        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
689
690        asserter.push_success(&Some(block(0)));
691
692        let mut stream = provider
693            .watch_blocks_from(0)
694            .block_tag(BlockNumberOrTag::Earliest)
695            .poll_interval(Duration::from_millis(1))
696            .into_stream()
697            .buffered(1);
698
699        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
700        assert_eq!(first.header.number, 0);
701    }
702
703    #[tokio::test]
704    async fn stream_ends_when_provider_is_dropped() {
705        let provider =
706            ProviderBuilder::new().connect_mocked_client(alloy_transport::mock::Asserter::new());
707        let mut stream = provider.watch_blocks_from(0).into_stream();
708        drop(provider);
709
710        let next = timeout(Duration::from_secs(1), stream.next()).await.unwrap();
711        assert!(next.is_none());
712    }
713
714    #[tokio::test]
715    async fn yielded_future_outlives_provider() {
716        let asserter = alloy_transport::mock::Asserter::new();
717        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
718
719        asserter.push_success(&1_u64);
720        asserter.push_success(&Some(block(1)));
721
722        let mut stream = provider
723            .watch_blocks_from(1)
724            .block_tag(BlockNumberOrTag::Latest)
725            .poll_interval(Duration::from_millis(1))
726            .into_stream();
727
728        let fut = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
729        drop(stream);
730        drop(provider);
731
732        let block = timeout(Duration::from_secs(1), fut).await.unwrap().unwrap();
733        assert_eq!(block.header.number, 1);
734    }
735
736    #[tokio::test]
737    async fn multiple_yielded_futures_outlive_provider() {
738        let asserter = alloy_transport::mock::Asserter::new();
739        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
740
741        asserter.push_success(&2_u64);
742        asserter.push_success(&Some(block(1)));
743        asserter.push_success(&Some(block(2)));
744
745        let mut stream = provider
746            .watch_blocks_from(1)
747            .block_tag(BlockNumberOrTag::Latest)
748            .poll_interval(Duration::from_millis(1))
749            .into_stream();
750
751        let fut1 = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
752        let fut2 = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
753        drop(provider);
754
755        let first = timeout(Duration::from_secs(1), fut1).await.unwrap().unwrap();
756        let second = timeout(Duration::from_secs(1), fut2).await.unwrap().unwrap();
757        assert_eq!(first.header.number, 1);
758        assert_eq!(second.header.number, 2);
759    }
760
761    #[tokio::test]
762    async fn errors_when_cursor_cannot_advance() {
763        let asserter = alloy_transport::mock::Asserter::new();
764        let provider = ProviderBuilder::new().connect_mocked_client(asserter);
765
766        let mut stream = provider
767            .watch_blocks_from(u64::MAX)
768            .block_tag(BlockNumberOrTag::Number(u64::MAX))
769            .poll_interval(Duration::from_millis(1))
770            .into_stream()
771            .buffered(1);
772
773        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
774        let err = first.unwrap_err();
775        assert!(err.is_local_usage_error());
776    }
777
778    #[tokio::test]
779    async fn future_stream_can_be_buffered() {
780        let asserter = alloy_transport::mock::Asserter::new();
781        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
782
783        asserter.push_success(&2_u64);
784        asserter.push_success(&Some(block(1)));
785        asserter.push_success(&Some(block(2)));
786
787        let mut stream = provider
788            .watch_blocks_from(1)
789            .block_tag(BlockNumberOrTag::Latest)
790            .poll_interval(Duration::from_millis(1))
791            .into_stream()
792            .buffered(2);
793
794        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
795        assert_eq!(first.header.number, 1);
796
797        let second =
798            timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
799        assert_eq!(second.header.number, 2);
800    }
801
802    #[tokio::test]
803    async fn buffered_stream_does_not_skip_after_null_response() {
804        let asserter = alloy_transport::mock::Asserter::new();
805        let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
806
807        let no_block: Option<Block> = None;
808        asserter.push_success(&2_u64);
809        asserter.push_success(&no_block);
810        asserter.push_success(&Some(block(2)));
811        asserter.push_success(&Some(block(1)));
812
813        let mut stream = provider
814            .watch_blocks_from(1)
815            .block_tag(BlockNumberOrTag::Latest)
816            .poll_interval(Duration::from_millis(1))
817            .into_stream()
818            .buffered(2);
819
820        let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
821        assert_eq!(first.header.number, 1);
822
823        let second =
824            timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
825        assert_eq!(second.header.number, 2);
826    }
827}