1use super::{
2 watch_blocks_from::{FetchHeadFut, PollIntervalDelay, DEFAULT_POLL_INTERVAL},
3 BlockFut, WatchCanonicalLogsFrom,
4};
5use crate::transport::TransportErrorKind;
6use alloy_consensus::BlockHeader;
7use alloy_eips::BlockNumberOrTag;
8use alloy_json_rpc::RpcError;
9use alloy_network::{BlockResponse as _, Network};
10use alloy_network_primitives::{BlockTransactionsKind, HeaderResponse};
11use alloy_primitives::B256;
12use alloy_rpc_client::{RpcCall, RpcClientInner, WeakClient};
13use alloy_rpc_types_eth::{Filter, Log};
14use alloy_transport::{TransportError, TransportResult};
15use futures::{ready, Stream};
16use pin_project::pin_project;
17use std::{
18 future::Future,
19 marker::PhantomData,
20 pin::Pin,
21 sync::Arc,
22 task::{Context, Poll},
23 time::Duration,
24};
25
26#[derive(Clone, Debug)]
32pub struct BlockLogs<N: Network> {
33 pub block: N::BlockResponse,
35 pub logs: Vec<Log>,
37}
38
39impl<N: Network> BlockLogs<N> {
40 pub fn header(&self) -> &N::HeaderResponse {
42 self.block.header()
43 }
44}
45
46#[derive(Clone, Debug)]
85#[must_use = "this builder does nothing unless you call `.into_stream`"]
86pub struct WatchLogsFrom<N: Network> {
87 client: WeakClient,
88 filter: Filter,
89 start_block: u64,
90 poll_interval: Duration,
91 block_tag: BlockNumberOrTag,
92 kind: BlockTransactionsKind,
93 _phantom: PhantomData<fn() -> N>,
94}
95
96impl<N: Network> WatchLogsFrom<N> {
97 pub(crate) const fn new(client: WeakClient, start_block: u64, filter: Filter) -> Self {
98 Self {
99 client,
100 filter,
101 start_block,
102 poll_interval: DEFAULT_POLL_INTERVAL,
103 block_tag: BlockNumberOrTag::Latest,
104 kind: BlockTransactionsKind::Hashes,
105 _phantom: PhantomData,
106 }
107 }
108
109 pub const fn full(mut self) -> Self {
111 self.kind = BlockTransactionsKind::Full;
112 self
113 }
114
115 pub const fn hashes(mut self) -> Self {
117 self.kind = BlockTransactionsKind::Hashes;
118 self
119 }
120
121 pub const fn poll_interval(mut self, poll_interval: Duration) -> Self {
123 self.poll_interval = poll_interval;
124 self
125 }
126
127 pub const fn block_tag(mut self, block_tag: BlockNumberOrTag) -> Self {
132 self.block_tag = block_tag;
133 self
134 }
135
136 pub const fn canonical(self) -> WatchCanonicalLogsFrom<N> {
139 WatchCanonicalLogsFrom::new(self)
140 }
141
142 pub(super) fn get_block_logs(&self, block_number: u64) -> BlockLogsFut<N> {
144 self.client
145 .upgrade()
146 .map(|client| {
147 BlockLogsFut::new(
148 client,
149 block_number,
150 self.filter.clone(),
151 self.poll_interval,
152 self.kind,
153 )
154 })
155 .unwrap_or_else(|| BlockLogsFut::err(RpcError::local_usage_str("provider was dropped")))
156 }
157
158 pub const fn into_stream(self) -> WatchLogsFromStream<N> {
166 let current_block = self.start_block;
167 WatchLogsFromStream {
168 inner: self,
169 current_block,
170 head: 0,
171 state: WatchLogsFromState::FetchHead,
172 }
173 }
174}
175
176#[derive(Debug)]
181pub struct WatchLogsFromStream<N: Network> {
182 inner: WatchLogsFrom<N>,
183 current_block: u64,
184 head: u64,
185 state: WatchLogsFromState<N>,
186}
187
188#[derive(Debug)]
189enum WatchLogsFromState<N: Network> {
190 FetchHead,
192 FetchingHead { fut: FetchHeadFut<N::HeaderResponse> },
194 Yielding,
196 Sleeping { delay: PollIntervalDelay },
198 Done,
200}
201
202impl<N: Network> Stream for WatchLogsFromStream<N> {
203 type Item = BlockLogsFut<N>;
204
205 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
206 let this = self.get_mut();
207
208 loop {
209 match &mut this.state {
210 WatchLogsFromState::FetchHead => {
211 let Some(client) = this.inner.client.upgrade() else {
212 this.state = WatchLogsFromState::Done;
213 continue;
214 };
215 let fut = FetchHeadFut::new(client, this.inner.block_tag);
216 this.state = WatchLogsFromState::FetchingHead { fut };
217 }
218 WatchLogsFromState::FetchingHead { fut } => match ready!(Pin::new(fut).poll(cx)) {
219 Ok(head) => {
220 this.head = head;
221 if this.current_block > head {
222 this.state = WatchLogsFromState::Sleeping {
223 delay: PollIntervalDelay::new(this.inner.poll_interval),
224 };
225 } else {
226 this.state = WatchLogsFromState::Yielding;
227 }
228 }
229 Err(err) => {
230 this.state = WatchLogsFromState::Sleeping {
231 delay: PollIntervalDelay::new(this.inner.poll_interval),
232 };
233 return Poll::Ready(Some(BlockLogsFut::err(err)));
234 }
235 },
236 WatchLogsFromState::Yielding => {
237 if this.current_block > this.head {
238 this.state = WatchLogsFromState::Sleeping {
239 delay: PollIntervalDelay::new(this.inner.poll_interval),
240 };
241 continue;
242 }
243
244 let next_block = this.current_block.saturating_add(1);
245 if next_block <= this.current_block {
246 let err = RpcError::local_usage_str(
247 "watch logs stream step did not advance block cursor",
248 );
249 this.state = WatchLogsFromState::Sleeping {
250 delay: PollIntervalDelay::new(this.inner.poll_interval),
251 };
252 return Poll::Ready(Some(BlockLogsFut::err(err)));
253 }
254
255 let Some(client) = this.inner.client.upgrade() else {
256 this.state = WatchLogsFromState::Done;
257 continue;
258 };
259
260 let item_fut = BlockLogsFut::new(
261 client,
262 this.current_block,
263 this.inner.filter.clone(),
264 this.inner.poll_interval,
265 this.inner.kind,
266 );
267 this.current_block = next_block;
268 return Poll::Ready(Some(item_fut));
269 }
270 WatchLogsFromState::Sleeping { delay } => {
271 ready!(delay.poll(cx));
272 this.state = WatchLogsFromState::FetchHead;
273 }
274 WatchLogsFromState::Done => return Poll::Ready(None),
275 }
276 }
277 }
278}
279
280#[pin_project]
297#[derive(Debug)]
298pub struct BlockLogsFut<N: Network> {
299 client: Option<Arc<RpcClientInner>>,
300 block_number: u64,
301 filter: Filter,
302 #[pin]
303 state: BlockLogsFutState<N>,
304}
305
306#[pin_project(project = BlockLogsFutStateProj)]
307#[derive(Debug)]
308enum BlockLogsFutState<N: Network> {
309 FetchBlockAndLogs {
316 block: Option<TransportResult<N::BlockResponse>>,
317 range_logs: Option<TransportResult<Vec<Log>>>,
318 #[pin]
319 block_fut: BlockFut<N::BlockResponse>,
320 #[pin]
321 logs_call: RpcCall<(Filter,), Vec<Log>>,
322 },
323 FetchLogs {
329 block: Option<N::BlockResponse>,
330 block_number: u64,
331 block_hash: B256,
332 #[pin]
333 logs_call: RpcCall<(Filter,), Vec<Log>>,
334 },
335 Ready { result: Option<TransportResult<BlockLogs<N>>> },
337 Complete,
339}
340
341impl<N: Network> BlockLogsFut<N> {
342 fn new(
343 client: Arc<RpcClientInner>,
344 block_number: u64,
345 filter: Filter,
346 poll_interval: Duration,
347 kind: BlockTransactionsKind,
348 ) -> Self {
349 let block_fut = Self::block_fut(&client, block_number, poll_interval, kind);
350 let logs_call = Self::logs_call(&client, filter.clone().select(block_number));
351 Self {
352 client: Some(client),
353 block_number,
354 filter,
355 state: BlockLogsFutState::FetchBlockAndLogs {
356 block: None,
357 range_logs: None,
358 block_fut,
359 logs_call,
360 },
361 }
362 }
363
364 fn err(err: TransportError) -> Self {
365 Self {
366 client: None,
367 block_number: 0,
368 filter: Filter::new(),
369 state: BlockLogsFutState::Ready { result: Some(Err(err)) },
370 }
371 }
372
373 fn block_fut(
374 client: &Arc<RpcClientInner>,
375 block_number: u64,
376 poll_interval: Duration,
377 kind: BlockTransactionsKind,
378 ) -> BlockFut<N::BlockResponse> {
379 BlockFut::new(client.clone(), block_number, kind, poll_interval)
380 }
381
382 fn logs_call(client: &Arc<RpcClientInner>, filter: Filter) -> RpcCall<(Filter,), Vec<Log>> {
383 client.request("eth_getLogs", (filter,))
384 }
385}
386
387impl<N: Network> Future for BlockLogsFut<N> {
388 type Output = TransportResult<BlockLogs<N>>;
389
390 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
391 let mut this = self.project();
392
393 loop {
394 match this.state.as_mut().project() {
395 BlockLogsFutStateProj::FetchBlockAndLogs {
396 block,
397 range_logs,
398 block_fut,
399 logs_call,
400 } => {
401 if block.is_none() {
402 if let Poll::Ready(result) = block_fut.poll(cx) {
403 *block = Some(result.and_then(|block| {
404 let block_number = block.header().number();
405 if block_number != *this.block_number {
406 Err(TransportErrorKind::custom_str(
407 "eth_getBlockByNumber returned a block with an unexpected number",
408 ))
409 } else {
410 Ok(block)
411 }
412 }));
413 }
414 }
415
416 if range_logs.is_none() {
417 if let Poll::Ready(result) = logs_call.poll(cx) {
418 *range_logs = Some(result);
419 }
420 }
421
422 if block.as_ref().is_some_and(Result::is_err) {
423 let Some(Err(err)) = block.take() else { unreachable!() };
424 this.state.set(BlockLogsFutState::Complete);
425 return Poll::Ready(Err(err));
426 }
427
428 if block.is_none() || range_logs.is_none() {
429 return Poll::Pending;
430 }
431
432 let Ok(block_result) = block.take().expect("checked block is ready") else {
433 unreachable!("block errors are handled above");
434 };
435 let logs_result = range_logs.take().expect("checked logs are ready");
436
437 let block_number = block_result.header().number();
438 let block_hash = block_result.header().hash();
439 if let Ok(logs) = logs_result {
440 if let Some(logs) =
441 normalize_range_logs_if_matches(logs, block_number, block_hash)
442 {
443 this.state.set(BlockLogsFutState::Complete);
444 return Poll::Ready(Ok(BlockLogs { block: block_result, logs }));
445 }
446 }
447
448 let Some(client) = this.client.as_ref() else {
449 this.state.set(BlockLogsFutState::Complete);
450 return Poll::Ready(Err(TransportError::local_usage_str(
451 "provider was dropped",
452 )));
453 };
454 let logs_call =
455 Self::logs_call(client, this.filter.clone().at_block_hash(block_hash));
456 this.state.set(BlockLogsFutState::FetchLogs {
457 block: Some(block_result),
458 block_number,
459 block_hash,
460 logs_call,
461 });
462 }
463 BlockLogsFutStateProj::FetchLogs { block, block_number, block_hash, logs_call } => {
464 match ready!(logs_call.poll(cx))
465 .and_then(|logs| normalize_logs(logs, *block_number, *block_hash))
466 {
467 Ok(logs) => {
468 let block = block.take().expect("block is present while fetching logs");
469 this.state.set(BlockLogsFutState::Complete);
470 return Poll::Ready(Ok(BlockLogs { block, logs }));
471 }
472 Err(err) => {
473 this.state.set(BlockLogsFutState::Complete);
474 return Poll::Ready(Err(err));
475 }
476 }
477 }
478 BlockLogsFutStateProj::Ready { result } => {
479 let result = result.take().expect("polled BlockLogsFut after completion");
480 this.state.set(BlockLogsFutState::Complete);
481 return Poll::Ready(result);
482 }
483 BlockLogsFutStateProj::Complete => panic!("polled BlockLogsFut after completion"),
484 }
485 }
486 }
487}
488
489fn normalize_logs(
491 mut logs: Vec<Log>,
492 block_number: u64,
493 block_hash: B256,
494) -> TransportResult<Vec<Log>> {
495 for log in &mut logs {
496 if log.block_number.is_some_and(|number| number != block_number) {
497 return Err(TransportErrorKind::custom_str(
498 "eth_getLogs returned a log with an unexpected block number",
499 ));
500 }
501 if log.block_hash.is_some_and(|hash| hash != block_hash) {
502 return Err(TransportErrorKind::custom_str(
503 "eth_getLogs returned a log with an unexpected block hash",
504 ));
505 }
506 log.block_number = Some(block_number);
507 log.block_hash = Some(block_hash);
508 log.removed = false;
509 }
510 Ok(logs)
511}
512
513fn normalize_range_logs_if_matches(
515 logs: Vec<Log>,
516 block_number: u64,
517 block_hash: B256,
518) -> Option<Vec<Log>> {
519 if logs.is_empty() {
520 return None;
521 }
522
523 let all_logs_match = logs.iter().all(|log| {
524 log.block_hash == Some(block_hash)
525 && log.block_number.is_none_or(|number| number == block_number)
526 });
527 if !all_logs_match {
528 return None;
529 }
530
531 Some(
532 normalize_logs(logs, block_number, block_hash)
533 .expect("range logs were checked against the target block"),
534 )
535}
536
537#[cfg(test)]
538mod tests {
539 use super::{
540 super::watch_logs_test_utils::{assert_batch, block, log, MockChain},
541 *,
542 };
543 use crate::Provider;
544 use alloy_network::Ethereum;
545 use alloy_rpc_types_eth::Block;
546 use futures::{Stream, StreamExt};
547 use tokio::time::timeout;
548
549 async fn next_batch<S>(stream: &mut S) -> BlockLogs<Ethereum>
550 where
551 S: Stream<Item = TransportResult<BlockLogs<Ethereum>>> + Unpin,
552 {
553 timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap()
554 }
555
556 #[tokio::test]
557 async fn streams_log_batches_from_start_block() {
558 let chain = MockChain::new();
559 chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)]), (block(2, 2, 1), vec![log(2, 2, 0)])]);
560
561 let provider = chain.provider();
562 let mut stream = provider
563 .watch_logs_from(1, &Filter::new())
564 .block_tag(BlockNumberOrTag::Latest)
565 .poll_interval(Duration::from_millis(1))
566 .into_stream()
567 .buffered(1);
568
569 assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
570 assert_batch(&next_batch(&mut stream).await, 2, 2, false, 1);
571 assert_eq!(chain.log_request_block_hash_flags(), vec![false, false]);
572 }
573
574 #[tokio::test]
575 async fn falls_back_to_block_hash_logs_for_empty_range_logs() {
576 let chain = MockChain::new();
577 chain.extend(&[(block(1, 1, 0), vec![]), (block(2, 2, 1), vec![log(2, 2, 0)])]);
578
579 let provider = chain.provider();
580 let mut stream = provider
581 .watch_logs_from(1, &Filter::new())
582 .block_tag(BlockNumberOrTag::Latest)
583 .poll_interval(Duration::from_millis(1))
584 .into_stream()
585 .buffered(1);
586
587 assert_batch(&next_batch(&mut stream).await, 1, 1, false, 0);
588 assert_batch(&next_batch(&mut stream).await, 2, 2, false, 1);
589 assert_eq!(chain.log_request_block_hash_flags(), vec![false, true, false]);
590 }
591
592 #[tokio::test]
593 async fn falls_back_to_block_hash_logs_for_mismatched_range_logs() {
594 let chain = MockChain::new();
595 chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)])]);
596 chain.override_next_range_logs(vec![log(1, 11, 0)]);
597
598 let provider = chain.provider();
599 let mut stream = provider
600 .watch_logs_from(1, &Filter::new())
601 .block_tag(BlockNumberOrTag::Number(1))
602 .poll_interval(Duration::from_millis(1))
603 .into_stream()
604 .buffered(1);
605
606 assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
607 assert_eq!(chain.log_request_block_hash_flags(), vec![false, true]);
608 }
609
610 #[tokio::test]
611 async fn full_and_hashes_configure_block_fetch_kind() {
612 let full_chain = MockChain::new();
613 full_chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)])]);
614
615 let provider = full_chain.provider();
616 let mut stream = provider
617 .watch_logs_from(1, &Filter::new())
618 .full()
619 .block_tag(BlockNumberOrTag::Number(1))
620 .poll_interval(Duration::from_millis(1))
621 .into_stream()
622 .buffered(1);
623
624 assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
625 assert_eq!(full_chain.block_request_full_flags(), vec![true]);
626
627 let hashes_chain = MockChain::new();
628 hashes_chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)])]);
629
630 let provider = hashes_chain.provider();
631 let mut stream = provider
632 .watch_logs_from(1, &Filter::new())
633 .hashes()
634 .block_tag(BlockNumberOrTag::Number(1))
635 .poll_interval(Duration::from_millis(1))
636 .into_stream()
637 .buffered(1);
638
639 assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
640 assert_eq!(hashes_chain.block_request_full_flags(), vec![false]);
641 }
642
643 #[tokio::test]
644 async fn provider_retry_layer_retries_log_error_without_skipping_block() {
645 let chain = MockChain::new();
646 chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)]), (block(2, 2, 1), vec![log(2, 2, 0)])]);
647 chain.fail_next_logs(1);
648
649 let provider = chain.provider_with_retry();
650 let mut stream = provider
651 .watch_logs_from(1, &Filter::new())
652 .block_tag(BlockNumberOrTag::Latest)
653 .poll_interval(Duration::from_millis(1))
654 .into_stream()
655 .buffered(1);
656
657 assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
658 assert_batch(&next_batch(&mut stream).await, 2, 2, false, 1);
659 }
660
661 #[tokio::test]
662 async fn emits_hash_pinned_candidate_when_chain_changes_after_log_fetch() {
663 let chain = MockChain::new();
664 chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)]), (block(2, 2, 1), vec![log(2, 2, 0)])]);
665
666 let provider = chain.provider();
667 let mut stream = provider
668 .watch_logs_from(1, &Filter::new())
669 .block_tag(BlockNumberOrTag::Latest)
670 .poll_interval(Duration::from_millis(1))
671 .into_stream()
672 .buffered(1);
673
674 assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
675
676 chain.reorg_after_next_log_success(vec![
677 (block(2, 22, 1), vec![log(2, 22, 0)]),
678 (block(3, 33, 22), vec![log(3, 33, 0)]),
679 ]);
680
681 assert_batch(&next_batch(&mut stream).await, 2, 2, false, 1);
682 assert_batch(&next_batch(&mut stream).await, 3, 33, false, 1);
683 }
684
685 #[tokio::test]
686 async fn normalizes_log_metadata_and_clears_removed_flag() {
687 let chain = MockChain::new();
688 chain.extend(&[(block(1, 1, 0), vec![Log { removed: true, ..Default::default() }])]);
689
690 let provider = chain.provider();
691 let mut stream = provider
692 .watch_logs_from(1, &Filter::new())
693 .block_tag(BlockNumberOrTag::Latest)
694 .poll_interval(Duration::from_millis(1))
695 .into_stream()
696 .buffered(1);
697
698 assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
699 }
700
701 #[tokio::test]
702 async fn rejects_logs_with_conflicting_metadata() {
703 let chain = MockChain::new();
704 chain.extend(&[(block(1, 1, 0), vec![log(2, 1, 0)])]);
705
706 let provider = chain.provider();
707 let mut stream = provider
708 .watch_logs_from(1, &Filter::new())
709 .block_tag(BlockNumberOrTag::Latest)
710 .poll_interval(Duration::from_millis(1))
711 .into_stream()
712 .buffered(1);
713
714 let err =
715 timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap_err();
716 assert!(format!("{err}").contains("unexpected block number"));
717 assert_eq!(chain.log_request_block_hash_flags(), vec![false, true]);
718 }
719
720 #[tokio::test]
721 async fn stream_ends_when_provider_is_dropped() {
722 let chain = MockChain::new();
723 let provider = chain.provider();
724 let mut stream = provider.watch_logs_from(0, &Filter::new()).into_stream();
725 drop(provider);
726
727 let next = timeout(Duration::from_secs(1), stream.next()).await.unwrap();
728 assert!(next.is_none());
729 }
730
731 #[tokio::test]
732 async fn yielded_future_outlives_provider() {
733 let chain = MockChain::new();
734 chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)])]);
735
736 let provider = chain.provider();
737 let mut stream = provider
738 .watch_logs_from(1, &Filter::new())
739 .block_tag(BlockNumberOrTag::Latest)
740 .poll_interval(Duration::from_millis(1))
741 .into_stream();
742
743 let fut = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
744 drop(stream);
745 drop(provider);
746
747 let block_logs = timeout(Duration::from_secs(1), fut).await.unwrap().unwrap();
748 assert_batch(&block_logs, 1, 1, false, 1);
749 }
750
751 #[tokio::test]
752 async fn errors_when_cursor_cannot_advance() {
753 let chain = MockChain::new();
754 let mut block: Block = block(u64::MAX, 1, 0);
755 block.header.inner.number = u64::MAX;
756 chain.extend(&[(block, vec![log(u64::MAX, 1, 0)])]);
757
758 let provider = chain.provider();
759 let mut stream = provider
760 .watch_logs_from(u64::MAX, &Filter::new())
761 .block_tag(BlockNumberOrTag::Number(u64::MAX))
762 .poll_interval(Duration::from_millis(1))
763 .into_stream()
764 .buffered(1);
765
766 let err =
767 timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap_err();
768 assert!(err.is_local_usage_error());
769 }
770}