1use super::WatchCanonicalBlocksFrom;
2use alloy_eips::BlockNumberOrTag;
3use alloy_json_rpc::{RpcError, RpcRecv};
4use alloy_network::{BlockResponse, Network};
5use alloy_network_primitives::{BlockTransactionsKind, HeaderResponse};
6use alloy_primitives::U64;
7use alloy_rpc_client::{RpcCall, RpcClientInner, WeakClient};
8use alloy_transport::{TransportError, TransportResult};
9use futures::{ready, Stream};
10use pin_project::pin_project;
11use std::{
12 future::Future,
13 marker::PhantomData,
14 pin::Pin,
15 sync::Arc,
16 task::{Context, Poll},
17 time::Duration,
18};
19
20#[cfg(all(target_family = "wasm", target_os = "unknown"))]
21use wasmtimer::{
22 std::Instant,
23 tokio::{interval_at, Interval},
24};
25
26#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
27use tokio::time::{interval_at, Instant, Interval};
28
29pub(super) const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(1);
30
31#[derive(Debug)]
32pub(super) struct PollIntervalDelay {
33 timer: Option<Interval>,
34}
35
36impl PollIntervalDelay {
37 pub(super) fn new(poll_interval: Duration) -> Self {
38 if poll_interval.is_zero() {
39 return Self { timer: None };
40 }
41 Self { timer: Some(interval_at(Instant::now() + poll_interval, poll_interval)) }
42 }
43
44 pub(super) fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
45 if let Some(timer) = &mut self.timer {
46 ready!(timer.poll_tick(cx));
47 }
48 Poll::Ready(())
49 }
50}
51
52#[pin_project]
63#[derive(Debug)]
64pub struct BlockFut<T>
65where
66 T: BlockResponse + RpcRecv,
67{
68 client: Option<Arc<RpcClientInner>>,
69 block_number: u64,
70 kind: BlockTransactionsKind,
71 poll_interval: Duration,
72 #[pin]
73 state: BlockFutState<T>,
74}
75
76#[pin_project(project = BlockFutStateProj)]
77#[derive(Debug)]
78enum BlockFutState<T>
79where
80 T: BlockResponse + RpcRecv,
81{
82 Request {
84 #[pin]
85 call: RpcCall<(BlockNumberOrTag, bool), Option<T>>,
86 },
87 Sleeping { delay: PollIntervalDelay },
89 Ready { result: Option<TransportResult<T>> },
91 Complete,
93}
94
95impl<T> BlockFut<T>
96where
97 T: BlockResponse + RpcRecv,
98{
99 pub(super) fn new(
100 client: Arc<RpcClientInner>,
101 block_number: u64,
102 kind: BlockTransactionsKind,
103 poll_interval: Duration,
104 ) -> Self {
105 let call = Self::block_request_call(&client, block_number, kind);
106 Self {
107 client: Some(client),
108 block_number,
109 kind,
110 poll_interval,
111 state: BlockFutState::Request { call },
112 }
113 }
114
115 pub(super) const fn err(err: TransportError) -> Self {
116 Self {
117 client: None,
118 block_number: 0,
119 kind: BlockTransactionsKind::Hashes,
120 poll_interval: Duration::from_secs(0),
121 state: BlockFutState::Ready { result: Some(Err(err)) },
122 }
123 }
124
125 fn block_request_call(
126 client: &Arc<RpcClientInner>,
127 block_number: u64,
128 kind: BlockTransactionsKind,
129 ) -> RpcCall<(BlockNumberOrTag, bool), Option<T>> {
130 client
131 .request("eth_getBlockByNumber", (BlockNumberOrTag::from(block_number), kind.is_full()))
132 }
133}
134
135impl<T> Future for BlockFut<T>
136where
137 T: BlockResponse + RpcRecv,
138{
139 type Output = TransportResult<T>;
140
141 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
142 let mut this = self.project();
143
144 loop {
145 match this.state.as_mut().project() {
146 BlockFutStateProj::Request { call } => match ready!(call.poll(cx)) {
147 Ok(Some(mut block)) => {
148 if this.kind.is_hashes() && block.transactions().is_empty() {
149 block.transactions_mut().convert_to_hashes();
150 }
151 this.state.set(BlockFutState::Complete);
152 return Poll::Ready(Ok(block));
153 }
154 Ok(None) => {
155 this.state.set(BlockFutState::Sleeping {
156 delay: PollIntervalDelay::new(*this.poll_interval),
157 });
158 }
159 Err(err) => {
160 this.state.set(BlockFutState::Complete);
161 return Poll::Ready(Err(err));
162 }
163 },
164 BlockFutStateProj::Sleeping { delay } => {
165 ready!(delay.poll(cx));
166 let Some(client) = this.client.as_ref() else {
167 this.state.set(BlockFutState::Complete);
168 return Poll::Ready(Err(TransportError::local_usage_str(
169 "provider was dropped",
170 )));
171 };
172 this.state.set(BlockFutState::Request {
173 call: Self::block_request_call(client, *this.block_number, *this.kind),
174 });
175 }
176 BlockFutStateProj::Ready { result } => {
177 let result = result.take().expect("polled BlockFut after completion");
178 this.state.set(BlockFutState::Complete);
179 return Poll::Ready(result);
180 }
181 BlockFutStateProj::Complete => panic!("polled BlockFut after completion"),
182 }
183 }
184 }
185}
186
187#[pin_project]
194#[derive(Debug)]
195pub(super) struct FetchHeadFut<HeaderResp>
196where
197 HeaderResp: HeaderResponse + RpcRecv,
198{
199 #[pin]
200 state: FetchHeadFutState<HeaderResp>,
201}
202
203#[pin_project(project = FetchHeadFutStateProj)]
204#[derive(Debug)]
205enum FetchHeadFutState<HeaderResp>
206where
207 HeaderResp: HeaderResponse + RpcRecv,
208{
209 Latest {
211 #[pin]
212 call: RpcCall<[(); 0], U64>,
213 },
214 Tagged {
216 #[pin]
217 call: RpcCall<(BlockNumberOrTag, bool), Option<HeaderResp>>,
218 },
219 Ready { result: Option<TransportResult<u64>> },
221 Complete,
223}
224
225impl<HeaderResp> FetchHeadFut<HeaderResp>
226where
227 HeaderResp: HeaderResponse + RpcRecv,
228{
229 pub(super) fn new(client: Arc<RpcClientInner>, tag: BlockNumberOrTag) -> Self {
230 let state = match tag {
231 BlockNumberOrTag::Number(number) => {
232 FetchHeadFutState::Ready { result: Some(Ok(number)) }
233 }
234 BlockNumberOrTag::Earliest => FetchHeadFutState::Ready { result: Some(Ok(0)) },
235 BlockNumberOrTag::Latest => {
236 FetchHeadFutState::Latest { call: client.request_noparams("eth_blockNumber") }
237 }
238 _ => FetchHeadFutState::Tagged {
239 call: client.request("eth_getBlockByNumber", (tag, false)),
240 },
241 };
242 Self { state }
243 }
244}
245
246impl<HeaderResp> Future for FetchHeadFut<HeaderResp>
247where
248 HeaderResp: HeaderResponse + RpcRecv,
249{
250 type Output = TransportResult<u64>;
251
252 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
253 let mut this = self.project();
254
255 match this.state.as_mut().project() {
256 FetchHeadFutStateProj::Latest { call } => {
257 let result = ready!(call.poll(cx)).map(|n| n.to());
258 this.state.set(FetchHeadFutState::Complete);
259 Poll::Ready(result)
260 }
261 FetchHeadFutStateProj::Tagged { call } => {
262 let result = match ready!(call.poll(cx)) {
263 Ok(resp) => resp.map(|header| header.number()).ok_or(RpcError::NullResp),
264 Err(err) => Err(err),
265 };
266 this.state.set(FetchHeadFutState::Complete);
267 Poll::Ready(result)
268 }
269 FetchHeadFutStateProj::Ready { result } => {
270 let result = result.take().expect("polled FetchHeadFut after completion");
271 this.state.set(FetchHeadFutState::Complete);
272 Poll::Ready(result)
273 }
274 FetchHeadFutStateProj::Complete => panic!("polled FetchHeadFut after completion"),
275 }
276 }
277}
278
279#[derive(Debug, Clone)]
281#[must_use = "this builder does nothing unless you call `.into_stream`"]
282pub struct WatchBlocksFrom<N: Network> {
283 client: WeakClient,
284 start_block: u64,
285 poll_interval: Duration,
286 block_tag: BlockNumberOrTag,
287 kind: BlockTransactionsKind,
288 _phantom: PhantomData<fn() -> N>,
289}
290
291impl<N: Network> WatchBlocksFrom<N> {
292 pub(crate) const fn new(client: WeakClient, start_block: u64) -> Self {
294 Self {
295 client,
296 start_block,
297 poll_interval: DEFAULT_POLL_INTERVAL,
298 block_tag: BlockNumberOrTag::Latest,
299 kind: BlockTransactionsKind::Hashes,
300 _phantom: PhantomData,
301 }
302 }
303
304 pub const fn full(mut self) -> Self {
306 self.kind = BlockTransactionsKind::Full;
307 self
308 }
309
310 pub const fn hashes(mut self) -> Self {
312 self.kind = BlockTransactionsKind::Hashes;
313 self
314 }
315
316 pub const fn poll_interval(mut self, poll_interval: Duration) -> Self {
318 self.poll_interval = poll_interval;
319 self
320 }
321
322 pub const fn block_tag(mut self, block_tag: BlockNumberOrTag) -> Self {
324 self.block_tag = block_tag;
325 self
326 }
327
328 pub const fn canonical(self) -> WatchCanonicalBlocksFrom<N> {
331 WatchCanonicalBlocksFrom::new(self)
332 }
333
334 pub(super) fn get_block(&self, block_number: u64) -> BlockFut<N::BlockResponse> {
336 self.client
337 .upgrade()
338 .map(|client| BlockFut::new(client, block_number, self.kind, self.poll_interval))
339 .unwrap_or_else(|| BlockFut::err(RpcError::local_usage_str("provider was dropped")))
340 }
341
342 pub const fn into_stream(self) -> WatchBlocksFromStream<N> {
369 let current_block = self.start_block;
370 WatchBlocksFromStream {
371 inner: self,
372 current_block,
373 head: 0,
374 state: WatchBlocksFromState::FetchHead,
375 }
376 }
377}
378
379#[derive(Debug)]
386pub struct WatchBlocksFromStream<N: Network> {
387 inner: WatchBlocksFrom<N>,
388 current_block: u64,
389 head: u64,
390 state: WatchBlocksFromState<N>,
391}
392
393#[derive(Debug)]
394enum WatchBlocksFromState<N: Network> {
395 FetchHead,
397 FetchingHead { fut: FetchHeadFut<N::HeaderResponse> },
399 Yielding,
401 Sleeping { delay: PollIntervalDelay },
403 Done,
405}
406
407impl<N: Network> Stream for WatchBlocksFromStream<N> {
408 type Item = BlockFut<N::BlockResponse>;
409
410 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
411 let this = self.get_mut();
412
413 loop {
414 match &mut this.state {
415 WatchBlocksFromState::FetchHead => {
416 let Some(client) = this.inner.client.upgrade() else {
417 this.state = WatchBlocksFromState::Done;
418 continue;
419 };
420 let fut = FetchHeadFut::new(client, this.inner.block_tag);
421 this.state = WatchBlocksFromState::FetchingHead { fut };
422 }
423 WatchBlocksFromState::FetchingHead { fut } => {
424 match ready!(Pin::new(fut).poll(cx)) {
425 Ok(head) => {
426 this.head = head;
427 if this.current_block > head {
428 this.state = WatchBlocksFromState::Sleeping {
429 delay: PollIntervalDelay::new(this.inner.poll_interval),
430 };
431 } else {
432 this.state = WatchBlocksFromState::Yielding;
433 }
434 }
435 Err(err) => {
436 this.state = WatchBlocksFromState::Sleeping {
437 delay: PollIntervalDelay::new(this.inner.poll_interval),
438 };
439 return Poll::Ready(Some(BlockFut::err(err)));
440 }
441 }
442 }
443 WatchBlocksFromState::Yielding => {
444 if this.current_block > this.head {
445 this.state = WatchBlocksFromState::Sleeping {
446 delay: PollIntervalDelay::new(this.inner.poll_interval),
447 };
448 continue;
449 }
450
451 let next_block = this.current_block.saturating_add(1);
452 if next_block <= this.current_block {
453 let err = RpcError::local_usage_str(
454 "watch stream step did not advance block cursor",
455 );
456 this.state = WatchBlocksFromState::Sleeping {
457 delay: PollIntervalDelay::new(this.inner.poll_interval),
458 };
459 return Poll::Ready(Some(BlockFut::err(err)));
460 }
461
462 let Some(client) = this.inner.client.upgrade() else {
463 this.state = WatchBlocksFromState::Done;
464 continue;
465 };
466
467 let item_fut: BlockFut<N::BlockResponse> = BlockFut::new(
468 client,
469 this.current_block,
470 this.inner.kind,
471 this.inner.poll_interval,
472 );
473 this.current_block = next_block;
474 return Poll::Ready(Some(item_fut));
475 }
476 WatchBlocksFromState::Sleeping { delay } => {
477 ready!(delay.poll(cx));
478 this.state = WatchBlocksFromState::FetchHead;
479 }
480 WatchBlocksFromState::Done => return Poll::Ready(None),
481 }
482 }
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489 use crate::{Provider, ProviderBuilder};
490 use alloy_rpc_client::RpcClient;
491 use alloy_rpc_types_eth::Block;
492 use alloy_transport::{
493 layers::{RetryBackoffLayer, RetryPolicy},
494 mock::MockTransport,
495 };
496 use futures::StreamExt;
497 use tokio::time::timeout;
498
499 fn block(number: u64) -> Block {
500 let mut block: Block = Block::default();
501 block.header.inner.number = number;
502 block
503 }
504
505 #[tokio::test]
506 async fn streams_blocks_from_start_block() {
507 let asserter = alloy_transport::mock::Asserter::new();
508 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
509
510 asserter.push_success(&3_u64);
511 asserter.push_success(&Some(block(1)));
512 asserter.push_success(&Some(block(2)));
513 asserter.push_success(&Some(block(3)));
514
515 let mut stream = provider
516 .watch_blocks_from(1)
517 .block_tag(BlockNumberOrTag::Latest)
518 .poll_interval(Duration::from_millis(1))
519 .into_stream()
520 .buffered(1);
521
522 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
523 assert_eq!(first.header.number, 1);
524
525 let second =
526 timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
527 assert_eq!(second.header.number, 2);
528
529 let third = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
530 assert_eq!(third.header.number, 3);
531 }
532
533 #[tokio::test]
534 async fn advances_to_next_block_after_error() {
535 let asserter = alloy_transport::mock::Asserter::new();
536 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
537
538 asserter.push_success(&1_u64);
539 asserter.push_failure_msg("boom");
540 asserter.push_success(&2_u64);
541 asserter.push_success(&Some(block(2)));
542
543 let mut stream = provider
544 .watch_blocks_from(1)
545 .block_tag(BlockNumberOrTag::Latest)
546 .poll_interval(Duration::from_millis(1))
547 .into_stream()
548 .buffered(1);
549
550 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
551 assert!(first.is_err());
552
553 let second =
554 timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
555 assert_eq!(second.header.number, 2);
556 }
557
558 #[tokio::test]
559 async fn retries_same_block_after_null_response() {
560 let asserter = alloy_transport::mock::Asserter::new();
561 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
562
563 let no_block: Option<Block> = None;
564 asserter.push_success(&1_u64);
565 asserter.push_success(&no_block);
566 asserter.push_success(&Some(block(1)));
567 asserter.push_success(&2_u64);
568 asserter.push_success(&Some(block(2)));
569
570 let mut stream = provider
571 .watch_blocks_from(1)
572 .block_tag(BlockNumberOrTag::Latest)
573 .poll_interval(Duration::from_millis(1))
574 .into_stream()
575 .buffered(1);
576
577 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
578 assert_eq!(first.header.number, 1);
579
580 let second =
581 timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
582 assert_eq!(second.header.number, 2);
583 }
584
585 #[tokio::test]
586 async fn recovers_after_head_fetch_error() {
587 let asserter = alloy_transport::mock::Asserter::new();
588 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
589
590 asserter.push_failure_msg("head boom");
591 asserter.push_success(&1_u64);
592 asserter.push_success(&Some(block(1)));
593
594 let mut stream = provider
595 .watch_blocks_from(1)
596 .block_tag(BlockNumberOrTag::Latest)
597 .poll_interval(Duration::from_millis(1))
598 .into_stream()
599 .buffered(1);
600
601 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
602 assert!(first.is_err());
603
604 let second =
605 timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
606 assert_eq!(second.header.number, 1);
607 }
608
609 #[tokio::test]
610 async fn uses_provider_retry_layer() {
611 #[derive(Clone, Debug)]
612 struct AlwaysRetryPolicy;
613
614 impl RetryPolicy for AlwaysRetryPolicy {
615 fn should_retry(&self, _error: &alloy_transport::TransportError) -> bool {
616 true
617 }
618
619 fn backoff_hint(&self, _error: &alloy_transport::TransportError) -> Option<Duration> {
620 None
621 }
622 }
623
624 let asserter = alloy_transport::mock::Asserter::new();
625 let retry_layer = RetryBackoffLayer::new_with_policy(3, 0, 10_000, AlwaysRetryPolicy);
626 let client = RpcClient::builder()
627 .layer(retry_layer)
628 .transport(MockTransport::new(asserter.clone()), true);
629 let provider = ProviderBuilder::new().connect_client(client);
630
631 asserter.push_failure_msg("temporary head error");
632 asserter.push_success(&1_u64);
633 asserter.push_failure_msg("temporary block error");
634 asserter.push_success(&Some(block(1)));
635
636 let mut stream = provider
637 .watch_blocks_from(1)
638 .block_tag(BlockNumberOrTag::Latest)
639 .poll_interval(Duration::from_millis(1))
640 .into_stream()
641 .buffered(1);
642
643 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
644 assert_eq!(first.header.number, 1);
645 }
646
647 #[tokio::test]
648 async fn waits_until_head_reaches_start_block() {
649 let asserter = alloy_transport::mock::Asserter::new();
650 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
651
652 asserter.push_success(&0_u64);
653 asserter.push_success(&1_u64);
654 asserter.push_success(&Some(block(1)));
655
656 let mut stream = provider
657 .watch_blocks_from(1)
658 .block_tag(BlockNumberOrTag::Latest)
659 .poll_interval(Duration::from_millis(1))
660 .into_stream()
661 .buffered(1);
662
663 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
664 assert_eq!(first.header.number, 1);
665 }
666
667 #[tokio::test]
668 async fn fixed_block_tag_number_does_not_fetch_head() {
669 let asserter = alloy_transport::mock::Asserter::new();
670 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
671
672 asserter.push_success(&Some(block(5)));
673
674 let mut stream = provider
675 .watch_blocks_from(5)
676 .block_tag(BlockNumberOrTag::Number(5))
677 .poll_interval(Duration::from_millis(1))
678 .into_stream()
679 .buffered(1);
680
681 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
682 assert_eq!(first.header.number, 5);
683 }
684
685 #[tokio::test]
686 async fn earliest_block_tag_starts_at_zero() {
687 let asserter = alloy_transport::mock::Asserter::new();
688 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
689
690 asserter.push_success(&Some(block(0)));
691
692 let mut stream = provider
693 .watch_blocks_from(0)
694 .block_tag(BlockNumberOrTag::Earliest)
695 .poll_interval(Duration::from_millis(1))
696 .into_stream()
697 .buffered(1);
698
699 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
700 assert_eq!(first.header.number, 0);
701 }
702
703 #[tokio::test]
704 async fn stream_ends_when_provider_is_dropped() {
705 let provider =
706 ProviderBuilder::new().connect_mocked_client(alloy_transport::mock::Asserter::new());
707 let mut stream = provider.watch_blocks_from(0).into_stream();
708 drop(provider);
709
710 let next = timeout(Duration::from_secs(1), stream.next()).await.unwrap();
711 assert!(next.is_none());
712 }
713
714 #[tokio::test]
715 async fn yielded_future_outlives_provider() {
716 let asserter = alloy_transport::mock::Asserter::new();
717 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
718
719 asserter.push_success(&1_u64);
720 asserter.push_success(&Some(block(1)));
721
722 let mut stream = provider
723 .watch_blocks_from(1)
724 .block_tag(BlockNumberOrTag::Latest)
725 .poll_interval(Duration::from_millis(1))
726 .into_stream();
727
728 let fut = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
729 drop(stream);
730 drop(provider);
731
732 let block = timeout(Duration::from_secs(1), fut).await.unwrap().unwrap();
733 assert_eq!(block.header.number, 1);
734 }
735
736 #[tokio::test]
737 async fn multiple_yielded_futures_outlive_provider() {
738 let asserter = alloy_transport::mock::Asserter::new();
739 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
740
741 asserter.push_success(&2_u64);
742 asserter.push_success(&Some(block(1)));
743 asserter.push_success(&Some(block(2)));
744
745 let mut stream = provider
746 .watch_blocks_from(1)
747 .block_tag(BlockNumberOrTag::Latest)
748 .poll_interval(Duration::from_millis(1))
749 .into_stream();
750
751 let fut1 = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
752 let fut2 = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
753 drop(provider);
754
755 let first = timeout(Duration::from_secs(1), fut1).await.unwrap().unwrap();
756 let second = timeout(Duration::from_secs(1), fut2).await.unwrap().unwrap();
757 assert_eq!(first.header.number, 1);
758 assert_eq!(second.header.number, 2);
759 }
760
761 #[tokio::test]
762 async fn errors_when_cursor_cannot_advance() {
763 let asserter = alloy_transport::mock::Asserter::new();
764 let provider = ProviderBuilder::new().connect_mocked_client(asserter);
765
766 let mut stream = provider
767 .watch_blocks_from(u64::MAX)
768 .block_tag(BlockNumberOrTag::Number(u64::MAX))
769 .poll_interval(Duration::from_millis(1))
770 .into_stream()
771 .buffered(1);
772
773 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
774 let err = first.unwrap_err();
775 assert!(err.is_local_usage_error());
776 }
777
778 #[tokio::test]
779 async fn future_stream_can_be_buffered() {
780 let asserter = alloy_transport::mock::Asserter::new();
781 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
782
783 asserter.push_success(&2_u64);
784 asserter.push_success(&Some(block(1)));
785 asserter.push_success(&Some(block(2)));
786
787 let mut stream = provider
788 .watch_blocks_from(1)
789 .block_tag(BlockNumberOrTag::Latest)
790 .poll_interval(Duration::from_millis(1))
791 .into_stream()
792 .buffered(2);
793
794 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
795 assert_eq!(first.header.number, 1);
796
797 let second =
798 timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
799 assert_eq!(second.header.number, 2);
800 }
801
802 #[tokio::test]
803 async fn buffered_stream_does_not_skip_after_null_response() {
804 let asserter = alloy_transport::mock::Asserter::new();
805 let provider = ProviderBuilder::new().connect_mocked_client(asserter.clone());
806
807 let no_block: Option<Block> = None;
808 asserter.push_success(&2_u64);
809 asserter.push_success(&no_block);
810 asserter.push_success(&Some(block(2)));
811 asserter.push_success(&Some(block(1)));
812
813 let mut stream = provider
814 .watch_blocks_from(1)
815 .block_tag(BlockNumberOrTag::Latest)
816 .poll_interval(Duration::from_millis(1))
817 .into_stream()
818 .buffered(2);
819
820 let first = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
821 assert_eq!(first.header.number, 1);
822
823 let second =
824 timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap();
825 assert_eq!(second.header.number, 2);
826 }
827}