Skip to main content

alloy_provider/provider/
watch_logs_from.rs

1use super::{
2    watch_blocks_from::{FetchHeadFut, PollIntervalDelay, DEFAULT_POLL_INTERVAL},
3    BlockFut, WatchCanonicalLogsFrom,
4};
5use crate::transport::TransportErrorKind;
6use alloy_consensus::BlockHeader;
7use alloy_eips::BlockNumberOrTag;
8use alloy_json_rpc::RpcError;
9use alloy_network::{BlockResponse as _, Network};
10use alloy_network_primitives::{BlockTransactionsKind, HeaderResponse};
11use alloy_primitives::B256;
12use alloy_rpc_client::{RpcCall, RpcClientInner, WeakClient};
13use alloy_rpc_types_eth::{Filter, Log};
14use alloy_transport::{TransportError, TransportResult};
15use futures::{ready, Stream};
16use pin_project::pin_project;
17use std::{
18    future::Future,
19    marker::PhantomData,
20    pin::Pin,
21    sync::Arc,
22    task::{Context, Poll},
23    time::Duration,
24};
25
26/// Logs matching a filter for a single block.
27///
28/// This is the payload resolved by [`BlockLogsFut`] and carried by canonical log events. The
29/// `block` field identifies the block whose logs were queried, and `logs` contains all matching
30/// logs for that block. `logs` may be empty when the block contained no matching logs.
31#[derive(Clone, Debug)]
32pub struct BlockLogs<N: Network> {
33    /// The block these logs belong to.
34    pub block: N::BlockResponse,
35    /// Logs matching the configured filter for `block`.
36    pub logs: Vec<Log>,
37}
38
39impl<N: Network> BlockLogs<N> {
40    /// Returns the header for the block these logs belong to.
41    pub fn header(&self) -> &N::HeaderResponse {
42        self.block.header()
43    }
44}
45
46/// A builder for streaming block log batches from a historical block.
47///
48/// `WatchLogsFrom` is the log-specific counterpart to [`super::WatchBlocksFrom`]. It does not
49/// perform canonical reconciliation or emit removed events. Instead, it produces a stream of
50/// [`BlockLogsFut`] values, one future per block height from `start_block` through the configured
51/// `block_tag` head.
52///
53/// Each future fetches the block and a one-block log range concurrently. Non-empty range results
54/// are used when every log matches the fetched block hash; otherwise the future falls back to logs
55/// pinned to the fetched block hash. This keeps each resolved batch internally consistent even if
56/// the canonical block at that height changes later.
57///
58/// # Examples
59///
60/// ```no_run
61/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
62/// # use alloy_eips::BlockNumberOrTag;
63/// # use alloy_provider::{Provider, ProviderBuilder};
64/// # use alloy_rpc_types_eth::Filter;
65/// # use futures::StreamExt;
66///
67/// let provider = ProviderBuilder::new().connect_http("http://localhost:8545".parse()?);
68/// let filter = Filter::new();
69///
70/// let mut stream = provider
71///     .watch_logs_from(20_000_000, &filter)
72///     .block_tag(BlockNumberOrTag::Finalized)
73///     .into_stream()
74///     .buffered(4);
75///
76/// while let Some(batch) = stream.next().await {
77///     let block_logs = batch?;
78///     let _block = block_logs.block;
79///     let _logs = block_logs.logs;
80/// }
81/// # Ok(())
82/// # }
83/// ```
84#[derive(Clone, Debug)]
85#[must_use = "this builder does nothing unless you call `.into_stream`"]
86pub struct WatchLogsFrom<N: Network> {
87    client: WeakClient,
88    filter: Filter,
89    start_block: u64,
90    poll_interval: Duration,
91    block_tag: BlockNumberOrTag,
92    kind: BlockTransactionsKind,
93    _phantom: PhantomData<fn() -> N>,
94}
95
96impl<N: Network> WatchLogsFrom<N> {
97    pub(crate) const fn new(client: WeakClient, start_block: u64, filter: Filter) -> Self {
98        Self {
99            client,
100            filter,
101            start_block,
102            poll_interval: DEFAULT_POLL_INTERVAL,
103            block_tag: BlockNumberOrTag::Latest,
104            kind: BlockTransactionsKind::Hashes,
105            _phantom: PhantomData,
106        }
107    }
108
109    /// Streams block log batches with full transaction bodies in the block response.
110    pub const fn full(mut self) -> Self {
111        self.kind = BlockTransactionsKind::Full;
112        self
113    }
114
115    /// Streams block log batches with transaction hashes only in the block response.
116    pub const fn hashes(mut self) -> Self {
117        self.kind = BlockTransactionsKind::Hashes;
118        self
119    }
120
121    /// Sets the poll interval used when the stream is caught up to the configured head tag.
122    pub const fn poll_interval(mut self, poll_interval: Duration) -> Self {
123        self.poll_interval = poll_interval;
124        self
125    }
126
127    /// Sets the head block tag used to determine stream progress.
128    ///
129    /// The stream fetches all block log batches from `start_block` through this head, then polls
130    /// again after [`poll_interval`](Self::poll_interval) once caught up.
131    pub const fn block_tag(mut self, block_tag: BlockNumberOrTag) -> Self {
132        self.block_tag = block_tag;
133        self
134    }
135
136    /// Converts this builder into a canonical-stream builder that emits
137    /// [`CanonicalEvent`](crate::CanonicalEvent) deltas on reorgs.
138    pub const fn canonical(self) -> WatchCanonicalLogsFrom<N> {
139        WatchCanonicalLogsFrom::new(self)
140    }
141
142    /// Creates a future that fetches one block/log batch by block number.
143    pub(super) fn get_block_logs(&self, block_number: u64) -> BlockLogsFut<N> {
144        self.client
145            .upgrade()
146            .map(|client| {
147                BlockLogsFut::new(
148                    client,
149                    block_number,
150                    self.filter.clone(),
151                    self.poll_interval,
152                    self.kind,
153                )
154            })
155            .unwrap_or_else(|| BlockLogsFut::err(RpcError::local_usage_str("provider was dropped")))
156    }
157
158    /// Stream block/log fetching futures from a historical block.
159    ///
160    /// The stream polls the configured head, yields futures for `current_block..=head`, then sleeps
161    /// for `poll_interval` once caught up. It intentionally mirrors
162    /// [`super::WatchBlocksFromStream`] so callers can apply
163    /// [`StreamExt::buffered`](futures::StreamExt::buffered) for concurrent RPC work while still
164    /// receiving resolved batches in block-number order.
165    pub const fn into_stream(self) -> WatchLogsFromStream<N> {
166        let current_block = self.start_block;
167        WatchLogsFromStream {
168            inner: self,
169            current_block,
170            head: 0,
171            state: WatchLogsFromState::FetchHead,
172        }
173    }
174}
175
176/// A stream of block/log-fetching futures.
177///
178/// Each yielded [`BlockLogsFut`] fetches one block, then fetches matching logs by that block's
179/// hash.
180#[derive(Debug)]
181pub struct WatchLogsFromStream<N: Network> {
182    inner: WatchLogsFrom<N>,
183    current_block: u64,
184    head: u64,
185    state: WatchLogsFromState<N>,
186}
187
188#[derive(Debug)]
189enum WatchLogsFromState<N: Network> {
190    /// Upgrade the client and begin fetching the current head.
191    FetchHead,
192    /// Polling the in-flight head-block-number future.
193    FetchingHead { fut: FetchHeadFut<N::HeaderResponse> },
194    /// Yielding block/log futures for `current_block..=head`.
195    Yielding,
196    /// Sleeping between poll cycles.
197    Sleeping { delay: PollIntervalDelay },
198    /// Stream terminated.
199    Done,
200}
201
202impl<N: Network> Stream for WatchLogsFromStream<N> {
203    type Item = BlockLogsFut<N>;
204
205    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
206        let this = self.get_mut();
207
208        loop {
209            match &mut this.state {
210                WatchLogsFromState::FetchHead => {
211                    let Some(client) = this.inner.client.upgrade() else {
212                        this.state = WatchLogsFromState::Done;
213                        continue;
214                    };
215                    let fut = FetchHeadFut::new(client, this.inner.block_tag);
216                    this.state = WatchLogsFromState::FetchingHead { fut };
217                }
218                WatchLogsFromState::FetchingHead { fut } => match ready!(Pin::new(fut).poll(cx)) {
219                    Ok(head) => {
220                        this.head = head;
221                        if this.current_block > head {
222                            this.state = WatchLogsFromState::Sleeping {
223                                delay: PollIntervalDelay::new(this.inner.poll_interval),
224                            };
225                        } else {
226                            this.state = WatchLogsFromState::Yielding;
227                        }
228                    }
229                    Err(err) => {
230                        this.state = WatchLogsFromState::Sleeping {
231                            delay: PollIntervalDelay::new(this.inner.poll_interval),
232                        };
233                        return Poll::Ready(Some(BlockLogsFut::err(err)));
234                    }
235                },
236                WatchLogsFromState::Yielding => {
237                    if this.current_block > this.head {
238                        this.state = WatchLogsFromState::Sleeping {
239                            delay: PollIntervalDelay::new(this.inner.poll_interval),
240                        };
241                        continue;
242                    }
243
244                    let next_block = this.current_block.saturating_add(1);
245                    if next_block <= this.current_block {
246                        let err = RpcError::local_usage_str(
247                            "watch logs stream step did not advance block cursor",
248                        );
249                        this.state = WatchLogsFromState::Sleeping {
250                            delay: PollIntervalDelay::new(this.inner.poll_interval),
251                        };
252                        return Poll::Ready(Some(BlockLogsFut::err(err)));
253                    }
254
255                    let Some(client) = this.inner.client.upgrade() else {
256                        this.state = WatchLogsFromState::Done;
257                        continue;
258                    };
259
260                    let item_fut = BlockLogsFut::new(
261                        client,
262                        this.current_block,
263                        this.inner.filter.clone(),
264                        this.inner.poll_interval,
265                        this.inner.kind,
266                    );
267                    this.current_block = next_block;
268                    return Poll::Ready(Some(item_fut));
269                }
270                WatchLogsFromState::Sleeping { delay } => {
271                    ready!(delay.poll(cx));
272                    this.state = WatchLogsFromState::FetchHead;
273                }
274                WatchLogsFromState::Done => return Poll::Ready(None),
275            }
276        }
277    }
278}
279
280/// Future that resolves to a block/log batch for one block height.
281///
282/// `BlockLogsFut` is the item produced by [`WatchLogsFromStream`]. It performs the two pieces of
283/// work needed for one height:
284///
285/// - fetch the block with `eth_getBlockByNumber`;
286/// - fetch matching logs for the same numeric height with `eth_getLogs`.
287///
288/// Those two requests are started concurrently to reduce latency. The numeric-range log result is
289/// only used as a fast path when it is non-empty and every returned log carries the fetched block
290/// hash. If the range result is empty, has missing/mismatched metadata, or returns an RPC error,
291/// the future falls back to a second `eth_getLogs` request pinned to the fetched block hash.
292///
293/// This means every successful output is internally consistent: `logs` are normalized to the
294/// returned block number and block hash. A block-fetch error is returned directly; fallback log
295/// errors are returned after the block has been fetched.
296#[pin_project]
297#[derive(Debug)]
298pub struct BlockLogsFut<N: Network> {
299    client: Option<Arc<RpcClientInner>>,
300    block_number: u64,
301    filter: Filter,
302    #[pin]
303    state: BlockLogsFutState<N>,
304}
305
306#[pin_project(project = BlockLogsFutStateProj)]
307#[derive(Debug)]
308enum BlockLogsFutState<N: Network> {
309    /// Polling the candidate block request and optimistic one-block range log request.
310    ///
311    /// Either RPC may finish first, so completed results are stored in `block` and `range_logs`
312    /// until both are ready. The range log result is only accepted if it proves all logs
313    /// belong to the fetched block hash; otherwise the future transitions to
314    /// [`FetchLogs`](Self::FetchLogs).
315    FetchBlockAndLogs {
316        block: Option<TransportResult<N::BlockResponse>>,
317        range_logs: Option<TransportResult<Vec<Log>>>,
318        #[pin]
319        block_fut: BlockFut<N::BlockResponse>,
320        #[pin]
321        logs_call: RpcCall<(Filter,), Vec<Log>>,
322    },
323    /// Polling fallback logs pinned to the fetched block hash.
324    ///
325    /// This state is entered when the optimistic range result is empty, ambiguous, mismatched, or
326    /// failed. A successful response is normalized against the stored block before the future
327    /// resolves.
328    FetchLogs {
329        block: Option<N::BlockResponse>,
330        block_number: u64,
331        block_hash: B256,
332        #[pin]
333        logs_call: RpcCall<(Filter,), Vec<Log>>,
334    },
335    /// Returning a prebuilt result, used when the parent stream cannot create a normal request.
336    Ready { result: Option<TransportResult<BlockLogs<N>>> },
337    /// Future has completed and must not be polled again.
338    Complete,
339}
340
341impl<N: Network> BlockLogsFut<N> {
342    fn new(
343        client: Arc<RpcClientInner>,
344        block_number: u64,
345        filter: Filter,
346        poll_interval: Duration,
347        kind: BlockTransactionsKind,
348    ) -> Self {
349        let block_fut = Self::block_fut(&client, block_number, poll_interval, kind);
350        let logs_call = Self::logs_call(&client, filter.clone().select(block_number));
351        Self {
352            client: Some(client),
353            block_number,
354            filter,
355            state: BlockLogsFutState::FetchBlockAndLogs {
356                block: None,
357                range_logs: None,
358                block_fut,
359                logs_call,
360            },
361        }
362    }
363
364    fn err(err: TransportError) -> Self {
365        Self {
366            client: None,
367            block_number: 0,
368            filter: Filter::new(),
369            state: BlockLogsFutState::Ready { result: Some(Err(err)) },
370        }
371    }
372
373    fn block_fut(
374        client: &Arc<RpcClientInner>,
375        block_number: u64,
376        poll_interval: Duration,
377        kind: BlockTransactionsKind,
378    ) -> BlockFut<N::BlockResponse> {
379        BlockFut::new(client.clone(), block_number, kind, poll_interval)
380    }
381
382    fn logs_call(client: &Arc<RpcClientInner>, filter: Filter) -> RpcCall<(Filter,), Vec<Log>> {
383        client.request("eth_getLogs", (filter,))
384    }
385}
386
387impl<N: Network> Future for BlockLogsFut<N> {
388    type Output = TransportResult<BlockLogs<N>>;
389
390    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
391        let mut this = self.project();
392
393        loop {
394            match this.state.as_mut().project() {
395                BlockLogsFutStateProj::FetchBlockAndLogs {
396                    block,
397                    range_logs,
398                    block_fut,
399                    logs_call,
400                } => {
401                    if block.is_none() {
402                        if let Poll::Ready(result) = block_fut.poll(cx) {
403                            *block = Some(result.and_then(|block| {
404                                let block_number = block.header().number();
405                                if block_number != *this.block_number {
406                                    Err(TransportErrorKind::custom_str(
407                                        "eth_getBlockByNumber returned a block with an unexpected number",
408                                    ))
409                                } else {
410                                    Ok(block)
411                                }
412                            }));
413                        }
414                    }
415
416                    if range_logs.is_none() {
417                        if let Poll::Ready(result) = logs_call.poll(cx) {
418                            *range_logs = Some(result);
419                        }
420                    }
421
422                    if block.as_ref().is_some_and(Result::is_err) {
423                        let Some(Err(err)) = block.take() else { unreachable!() };
424                        this.state.set(BlockLogsFutState::Complete);
425                        return Poll::Ready(Err(err));
426                    }
427
428                    if block.is_none() || range_logs.is_none() {
429                        return Poll::Pending;
430                    }
431
432                    let Ok(block_result) = block.take().expect("checked block is ready") else {
433                        unreachable!("block errors are handled above");
434                    };
435                    let logs_result = range_logs.take().expect("checked logs are ready");
436
437                    let block_number = block_result.header().number();
438                    let block_hash = block_result.header().hash();
439                    if let Ok(logs) = logs_result {
440                        if let Some(logs) =
441                            normalize_range_logs_if_matches(logs, block_number, block_hash)
442                        {
443                            this.state.set(BlockLogsFutState::Complete);
444                            return Poll::Ready(Ok(BlockLogs { block: block_result, logs }));
445                        }
446                    }
447
448                    let Some(client) = this.client.as_ref() else {
449                        this.state.set(BlockLogsFutState::Complete);
450                        return Poll::Ready(Err(TransportError::local_usage_str(
451                            "provider was dropped",
452                        )));
453                    };
454                    let logs_call =
455                        Self::logs_call(client, this.filter.clone().at_block_hash(block_hash));
456                    this.state.set(BlockLogsFutState::FetchLogs {
457                        block: Some(block_result),
458                        block_number,
459                        block_hash,
460                        logs_call,
461                    });
462                }
463                BlockLogsFutStateProj::FetchLogs { block, block_number, block_hash, logs_call } => {
464                    match ready!(logs_call.poll(cx))
465                        .and_then(|logs| normalize_logs(logs, *block_number, *block_hash))
466                    {
467                        Ok(logs) => {
468                            let block = block.take().expect("block is present while fetching logs");
469                            this.state.set(BlockLogsFutState::Complete);
470                            return Poll::Ready(Ok(BlockLogs { block, logs }));
471                        }
472                        Err(err) => {
473                            this.state.set(BlockLogsFutState::Complete);
474                            return Poll::Ready(Err(err));
475                        }
476                    }
477                }
478                BlockLogsFutStateProj::Ready { result } => {
479                    let result = result.take().expect("polled BlockLogsFut after completion");
480                    this.state.set(BlockLogsFutState::Complete);
481                    return Poll::Ready(result);
482                }
483                BlockLogsFutStateProj::Complete => panic!("polled BlockLogsFut after completion"),
484            }
485        }
486    }
487}
488
489/// Ensures all logs match the exact block queried and clears any stale removed flag.
490fn normalize_logs(
491    mut logs: Vec<Log>,
492    block_number: u64,
493    block_hash: B256,
494) -> TransportResult<Vec<Log>> {
495    for log in &mut logs {
496        if log.block_number.is_some_and(|number| number != block_number) {
497            return Err(TransportErrorKind::custom_str(
498                "eth_getLogs returned a log with an unexpected block number",
499            ));
500        }
501        if log.block_hash.is_some_and(|hash| hash != block_hash) {
502            return Err(TransportErrorKind::custom_str(
503                "eth_getLogs returned a log with an unexpected block hash",
504            ));
505        }
506        log.block_number = Some(block_number);
507        log.block_hash = Some(block_hash);
508        log.removed = false;
509    }
510    Ok(logs)
511}
512
513/// Accepts optimistic range logs only when they prove they belong to the fetched block hash.
514fn normalize_range_logs_if_matches(
515    logs: Vec<Log>,
516    block_number: u64,
517    block_hash: B256,
518) -> Option<Vec<Log>> {
519    if logs.is_empty() {
520        return None;
521    }
522
523    let all_logs_match = logs.iter().all(|log| {
524        log.block_hash == Some(block_hash)
525            && log.block_number.is_none_or(|number| number == block_number)
526    });
527    if !all_logs_match {
528        return None;
529    }
530
531    Some(
532        normalize_logs(logs, block_number, block_hash)
533            .expect("range logs were checked against the target block"),
534    )
535}
536
537#[cfg(test)]
538mod tests {
539    use super::{
540        super::watch_logs_test_utils::{assert_batch, block, log, MockChain},
541        *,
542    };
543    use crate::Provider;
544    use alloy_network::Ethereum;
545    use alloy_rpc_types_eth::Block;
546    use futures::{Stream, StreamExt};
547    use tokio::time::timeout;
548
549    async fn next_batch<S>(stream: &mut S) -> BlockLogs<Ethereum>
550    where
551        S: Stream<Item = TransportResult<BlockLogs<Ethereum>>> + Unpin,
552    {
553        timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap()
554    }
555
556    #[tokio::test]
557    async fn streams_log_batches_from_start_block() {
558        let chain = MockChain::new();
559        chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)]), (block(2, 2, 1), vec![log(2, 2, 0)])]);
560
561        let provider = chain.provider();
562        let mut stream = provider
563            .watch_logs_from(1, &Filter::new())
564            .block_tag(BlockNumberOrTag::Latest)
565            .poll_interval(Duration::from_millis(1))
566            .into_stream()
567            .buffered(1);
568
569        assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
570        assert_batch(&next_batch(&mut stream).await, 2, 2, false, 1);
571        assert_eq!(chain.log_request_block_hash_flags(), vec![false, false]);
572    }
573
574    #[tokio::test]
575    async fn falls_back_to_block_hash_logs_for_empty_range_logs() {
576        let chain = MockChain::new();
577        chain.extend(&[(block(1, 1, 0), vec![]), (block(2, 2, 1), vec![log(2, 2, 0)])]);
578
579        let provider = chain.provider();
580        let mut stream = provider
581            .watch_logs_from(1, &Filter::new())
582            .block_tag(BlockNumberOrTag::Latest)
583            .poll_interval(Duration::from_millis(1))
584            .into_stream()
585            .buffered(1);
586
587        assert_batch(&next_batch(&mut stream).await, 1, 1, false, 0);
588        assert_batch(&next_batch(&mut stream).await, 2, 2, false, 1);
589        assert_eq!(chain.log_request_block_hash_flags(), vec![false, true, false]);
590    }
591
592    #[tokio::test]
593    async fn falls_back_to_block_hash_logs_for_mismatched_range_logs() {
594        let chain = MockChain::new();
595        chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)])]);
596        chain.override_next_range_logs(vec![log(1, 11, 0)]);
597
598        let provider = chain.provider();
599        let mut stream = provider
600            .watch_logs_from(1, &Filter::new())
601            .block_tag(BlockNumberOrTag::Number(1))
602            .poll_interval(Duration::from_millis(1))
603            .into_stream()
604            .buffered(1);
605
606        assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
607        assert_eq!(chain.log_request_block_hash_flags(), vec![false, true]);
608    }
609
610    #[tokio::test]
611    async fn full_and_hashes_configure_block_fetch_kind() {
612        let full_chain = MockChain::new();
613        full_chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)])]);
614
615        let provider = full_chain.provider();
616        let mut stream = provider
617            .watch_logs_from(1, &Filter::new())
618            .full()
619            .block_tag(BlockNumberOrTag::Number(1))
620            .poll_interval(Duration::from_millis(1))
621            .into_stream()
622            .buffered(1);
623
624        assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
625        assert_eq!(full_chain.block_request_full_flags(), vec![true]);
626
627        let hashes_chain = MockChain::new();
628        hashes_chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)])]);
629
630        let provider = hashes_chain.provider();
631        let mut stream = provider
632            .watch_logs_from(1, &Filter::new())
633            .hashes()
634            .block_tag(BlockNumberOrTag::Number(1))
635            .poll_interval(Duration::from_millis(1))
636            .into_stream()
637            .buffered(1);
638
639        assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
640        assert_eq!(hashes_chain.block_request_full_flags(), vec![false]);
641    }
642
643    #[tokio::test]
644    async fn provider_retry_layer_retries_log_error_without_skipping_block() {
645        let chain = MockChain::new();
646        chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)]), (block(2, 2, 1), vec![log(2, 2, 0)])]);
647        chain.fail_next_logs(1);
648
649        let provider = chain.provider_with_retry();
650        let mut stream = provider
651            .watch_logs_from(1, &Filter::new())
652            .block_tag(BlockNumberOrTag::Latest)
653            .poll_interval(Duration::from_millis(1))
654            .into_stream()
655            .buffered(1);
656
657        assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
658        assert_batch(&next_batch(&mut stream).await, 2, 2, false, 1);
659    }
660
661    #[tokio::test]
662    async fn emits_hash_pinned_candidate_when_chain_changes_after_log_fetch() {
663        let chain = MockChain::new();
664        chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)]), (block(2, 2, 1), vec![log(2, 2, 0)])]);
665
666        let provider = chain.provider();
667        let mut stream = provider
668            .watch_logs_from(1, &Filter::new())
669            .block_tag(BlockNumberOrTag::Latest)
670            .poll_interval(Duration::from_millis(1))
671            .into_stream()
672            .buffered(1);
673
674        assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
675
676        chain.reorg_after_next_log_success(vec![
677            (block(2, 22, 1), vec![log(2, 22, 0)]),
678            (block(3, 33, 22), vec![log(3, 33, 0)]),
679        ]);
680
681        assert_batch(&next_batch(&mut stream).await, 2, 2, false, 1);
682        assert_batch(&next_batch(&mut stream).await, 3, 33, false, 1);
683    }
684
685    #[tokio::test]
686    async fn normalizes_log_metadata_and_clears_removed_flag() {
687        let chain = MockChain::new();
688        chain.extend(&[(block(1, 1, 0), vec![Log { removed: true, ..Default::default() }])]);
689
690        let provider = chain.provider();
691        let mut stream = provider
692            .watch_logs_from(1, &Filter::new())
693            .block_tag(BlockNumberOrTag::Latest)
694            .poll_interval(Duration::from_millis(1))
695            .into_stream()
696            .buffered(1);
697
698        assert_batch(&next_batch(&mut stream).await, 1, 1, false, 1);
699    }
700
701    #[tokio::test]
702    async fn rejects_logs_with_conflicting_metadata() {
703        let chain = MockChain::new();
704        chain.extend(&[(block(1, 1, 0), vec![log(2, 1, 0)])]);
705
706        let provider = chain.provider();
707        let mut stream = provider
708            .watch_logs_from(1, &Filter::new())
709            .block_tag(BlockNumberOrTag::Latest)
710            .poll_interval(Duration::from_millis(1))
711            .into_stream()
712            .buffered(1);
713
714        let err =
715            timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap_err();
716        assert!(format!("{err}").contains("unexpected block number"));
717        assert_eq!(chain.log_request_block_hash_flags(), vec![false, true]);
718    }
719
720    #[tokio::test]
721    async fn stream_ends_when_provider_is_dropped() {
722        let chain = MockChain::new();
723        let provider = chain.provider();
724        let mut stream = provider.watch_logs_from(0, &Filter::new()).into_stream();
725        drop(provider);
726
727        let next = timeout(Duration::from_secs(1), stream.next()).await.unwrap();
728        assert!(next.is_none());
729    }
730
731    #[tokio::test]
732    async fn yielded_future_outlives_provider() {
733        let chain = MockChain::new();
734        chain.extend(&[(block(1, 1, 0), vec![log(1, 1, 0)])]);
735
736        let provider = chain.provider();
737        let mut stream = provider
738            .watch_logs_from(1, &Filter::new())
739            .block_tag(BlockNumberOrTag::Latest)
740            .poll_interval(Duration::from_millis(1))
741            .into_stream();
742
743        let fut = timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap();
744        drop(stream);
745        drop(provider);
746
747        let block_logs = timeout(Duration::from_secs(1), fut).await.unwrap().unwrap();
748        assert_batch(&block_logs, 1, 1, false, 1);
749    }
750
751    #[tokio::test]
752    async fn errors_when_cursor_cannot_advance() {
753        let chain = MockChain::new();
754        let mut block: Block = block(u64::MAX, 1, 0);
755        block.header.inner.number = u64::MAX;
756        chain.extend(&[(block, vec![log(u64::MAX, 1, 0)])]);
757
758        let provider = chain.provider();
759        let mut stream = provider
760            .watch_logs_from(u64::MAX, &Filter::new())
761            .block_tag(BlockNumberOrTag::Number(u64::MAX))
762            .poll_interval(Duration::from_millis(1))
763            .into_stream()
764            .buffered(1);
765
766        let err =
767            timeout(Duration::from_secs(1), stream.next()).await.unwrap().unwrap().unwrap_err();
768        assert!(err.is_local_usage_error());
769    }
770}