Skip to main content

alloy_contract/
event.rs

1use crate::Error;
2use alloy_network::Ethereum;
3use alloy_primitives::{Address, LogData, B256};
4use alloy_provider::{FilterPollerBuilder, Network, Provider};
5use alloy_rpc_types_eth::{BlockNumberOrTag, Filter, FilterBlockOption, Log, Topic, ValueOrArray};
6use alloy_sol_types::SolEvent;
7use alloy_transport::{BoxFuture, RpcError, TransportResult};
8use futures::Stream;
9use futures_util::StreamExt;
10use std::{fmt, marker::PhantomData};
11
12/// Helper for managing the event filter before querying or streaming its logs
13#[must_use = "event filters do nothing unless you `query`, `watch`, or `stream` them"]
14pub struct Event<P, E, N = Ethereum> {
15    /// The provider to use for querying or streaming logs.
16    pub provider: P,
17    /// The filter to use for querying or streaming logs.
18    pub filter: Filter,
19    _phantom: PhantomData<(E, N)>,
20}
21
22impl<P: fmt::Debug, E, N> fmt::Debug for Event<P, E, N> {
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        f.debug_struct("Event")
25            .field("provider", &self.provider)
26            .field("filter", &self.filter)
27            .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
28            .finish()
29    }
30}
31
32#[doc(hidden)]
33impl<'a, P: Provider<N>, E: SolEvent, N: Network> Event<&'a P, E, N> {
34    // `sol!` macro constructor, see `#[sol(rpc)]`. Not public API.
35    // NOTE: please avoid changing this function due to its use in the `sol!` macro.
36    pub fn new_sol(provider: &'a P, address: &Address) -> Self {
37        // keccak256 hash of the event signature needed for the filter to actually filter by event
38        // check that the event is not anonymous to include the event signature in the filter
39        if E::ANONYMOUS {
40            Self::new(provider, Filter::new().address(*address))
41        } else {
42            Self::new(provider, Filter::new().address(*address).event_signature(E::SIGNATURE_HASH))
43        }
44    }
45}
46
47impl<P: Provider<N>, E: SolEvent, N: Network> Event<P, E, N> {
48    /// Creates a new event with the provided provider and filter.
49    pub const fn new(provider: P, filter: Filter) -> Self {
50        Self { provider, filter, _phantom: PhantomData }
51    }
52
53    /// Queries the blockchain for the selected filter and returns a vector of matching event logs.
54    pub async fn query(&self) -> Result<Vec<(E, Log)>, Error> {
55        let logs = self.query_raw().await?;
56        logs.into_iter().map(|log| Ok((decode_log(&log)?, log))).collect()
57    }
58
59    /// Queries the blockchain for the selected filter and returns a vector of matching event logs,
60    /// without decoding them.
61    pub async fn query_raw(&self) -> TransportResult<Vec<Log>> {
62        self.provider.get_logs(&self.filter).await
63    }
64
65    /// Watches for events that match the filter.
66    ///
67    /// Returns a stream of decoded events and raw logs.
68    #[doc(alias = "stream")]
69    #[doc(alias = "stream_with_meta")]
70    pub async fn watch(&self) -> TransportResult<EventPoller<E>> {
71        let poller = self.provider.watch_logs(&self.filter).await?;
72        Ok(poller.into())
73    }
74
75    /// Subscribes to the stream of events that match the filter.
76    ///
77    /// Returns a stream of decoded events and raw logs.
78    #[cfg(feature = "pubsub")]
79    pub async fn subscribe(&self) -> TransportResult<subscription::EventSubscription<E>> {
80        let sub = self.provider.subscribe_logs(&self.filter).await?;
81        Ok(sub.into())
82    }
83
84    /// Sets the inner filter object
85    ///
86    /// See [`Filter::select`].
87    pub fn select(mut self, filter: impl Into<FilterBlockOption>) -> Self {
88        self.filter.block_option = filter.into();
89        self
90    }
91
92    /// Sets the from block number
93    pub fn from_block<B: Into<BlockNumberOrTag>>(mut self, block: B) -> Self {
94        self.filter.block_option = self.filter.block_option.with_from_block(block.into());
95        self
96    }
97
98    /// Sets the to block number
99    pub fn to_block<B: Into<BlockNumberOrTag>>(mut self, block: B) -> Self {
100        self.filter.block_option = self.filter.block_option.with_to_block(block.into());
101        self
102    }
103
104    /// Return `true` if filter configured to match pending block.
105    ///
106    /// This means that both `from_block` and `to_block` are set to the pending
107    /// tag.
108    pub fn is_pending_block_filter(&self) -> bool {
109        self.filter.block_option.get_from_block().is_some_and(BlockNumberOrTag::is_pending)
110            && self.filter.block_option.get_to_block().is_some_and(BlockNumberOrTag::is_pending)
111    }
112
113    /// Pins the block hash for the filter
114    pub fn at_block_hash<A: Into<B256>>(mut self, hash: A) -> Self {
115        self.filter.block_option = self.filter.block_option.with_block_hash(hash.into());
116        self
117    }
118
119    /// Sets the address to query with this filter.
120    ///
121    /// See [`Filter::address`].
122    pub fn address<A: Into<ValueOrArray<Address>>>(mut self, address: A) -> Self {
123        self.filter.address = address.into().into();
124        self
125    }
126
127    /// Given the event signature in string form, it hashes it and adds it to the topics to monitor
128    pub fn event(mut self, event_name: &str) -> Self {
129        self.filter = self.filter.event(event_name);
130        self
131    }
132
133    /// Hashes all event signatures and sets them as array to event_signature(topic0)
134    pub fn events(mut self, events: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
135        self.filter = self.filter.events(events);
136        self
137    }
138
139    /// Sets event_signature(topic0) (the event name for non-anonymous events)
140    pub fn event_signature<TO: Into<Topic>>(mut self, topic: TO) -> Self {
141        self.filter.topics[0] = topic.into();
142        self
143    }
144
145    /// Sets the 1st indexed topic
146    pub fn topic1<TO: Into<Topic>>(mut self, topic: TO) -> Self {
147        self.filter.topics[1] = topic.into();
148        self
149    }
150
151    /// Sets the 2nd indexed topic
152    pub fn topic2<TO: Into<Topic>>(mut self, topic: TO) -> Self {
153        self.filter.topics[2] = topic.into();
154        self
155    }
156
157    /// Sets the 3rd indexed topic
158    pub fn topic3<TO: Into<Topic>>(mut self, topic: TO) -> Self {
159        self.filter.topics[3] = topic.into();
160        self
161    }
162
163    /// Returns a [`ChunkedEvent`] builder for querying large block ranges.
164    ///
165    /// The builder attempts the full block range optimistically first. If that fails, it splits
166    /// the range into `chunk_size`-block windows and queries them concurrently. If an individual
167    /// chunk still fails, it falls back to block-by-block queries for that chunk.
168    ///
169    /// # Example
170    ///
171    /// ```ignore
172    /// let logs = event
173    ///     .chunked()
174    ///     .chunk_size(1_000)
175    ///     .concurrent(10)
176    ///     .query()
177    ///     .await?;
178    /// ```
179    pub fn chunked(self) -> ChunkedEvent<P, E, N> {
180        ChunkedEvent {
181            provider: self.provider,
182            filter: self.filter,
183            chunk_size: 1_000,
184            max_concurrent: 5,
185            _phantom: PhantomData,
186        }
187    }
188}
189
190impl<P: Clone, E, N> Event<&P, E, N> {
191    /// Clones the provider and returns a new event with the cloned provider.
192    pub fn with_cloned_provider(self) -> Event<P, E, N> {
193        Event { provider: self.provider.clone(), filter: self.filter, _phantom: PhantomData }
194    }
195}
196
197/// A chunked event query builder.
198///
199/// Created via [`Event::chunked`]. Configure chunk size and concurrency, then call
200/// [`query`](ChunkedEvent::query) or [`query_raw`](ChunkedEvent::query_raw).
201///
202/// Attempts the full block range optimistically first. If that fails, the range is split into
203/// `chunk_size`-block windows queried concurrently (bounded by `max_concurrent`). If an
204/// individual chunk still fails, each block in that chunk is queried individually.
205#[must_use = "ChunkedEvent does nothing unless you call `query` or `query_raw`"]
206#[derive(Clone)]
207pub struct ChunkedEvent<P, E, N = Ethereum> {
208    provider: P,
209    filter: Filter,
210    chunk_size: u64,
211    max_concurrent: usize,
212    _phantom: PhantomData<(E, N)>,
213}
214
215impl<P: fmt::Debug, E, N> fmt::Debug for ChunkedEvent<P, E, N> {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        f.debug_struct("ChunkedEvent")
218            .field("provider", &self.provider)
219            .field("filter", &self.filter)
220            .field("chunk_size", &self.chunk_size)
221            .field("max_concurrent", &self.max_concurrent)
222            .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
223            .finish()
224    }
225}
226
227impl<P, E, N> ChunkedEvent<P, E, N> {
228    /// Sets the chunk size in blocks (default: 1 000).
229    ///
230    /// # Panics
231    ///
232    /// Panics if `chunk_size` is 0.
233    pub fn chunk_size(mut self, chunk_size: u64) -> Self {
234        assert!(chunk_size > 0, "chunk_size must be greater than 0");
235        self.chunk_size = chunk_size;
236        self
237    }
238
239    /// Sets the maximum number of concurrent chunk requests (default: 5).
240    ///
241    /// # Panics
242    ///
243    /// Panics if `max_concurrent` is 0.
244    pub fn concurrent(mut self, max_concurrent: usize) -> Self {
245        assert!(max_concurrent > 0, "max_concurrent must be greater than 0");
246        self.max_concurrent = max_concurrent;
247        self
248    }
249}
250
251impl<P: Provider<N> + Clone + 'static, E: SolEvent + Send + Sync + 'static, N: Network>
252    std::future::IntoFuture for ChunkedEvent<P, E, N>
253{
254    type Output = Result<Vec<(E, Log)>, Error>;
255    type IntoFuture = BoxFuture<'static, Self::Output>;
256
257    fn into_future(self) -> Self::IntoFuture {
258        Box::pin(async move { self.query().await })
259    }
260}
261
262impl<P: Provider<N> + Clone, E: SolEvent, N: Network> ChunkedEvent<P, E, N> {
263    /// Queries the blockchain for matching event logs, decoding them.
264    ///
265    /// See [`query_raw`](Self::query_raw) for the full description of the chunked strategy.
266    pub async fn query(&self) -> Result<Vec<(E, Log)>, Error> {
267        let logs = self.query_raw().await?;
268        logs.into_iter().map(|log| Ok((decode_log(&log)?, log))).collect()
269    }
270
271    /// Queries the blockchain for matching event logs, without decoding.
272    ///
273    /// Attempts the full block range optimistically first. If that fails, splits the range into
274    /// `chunk_size`-block windows queried concurrently (up to `max_concurrent` at a time). If an
275    /// individual chunk still fails, falls back to querying each block individually.
276    pub async fn query_raw(&self) -> TransportResult<Vec<Log>> {
277        if let Ok(logs) = self.provider.get_logs(&self.filter).await {
278            return Ok(logs);
279        }
280        self.get_logs_chunked().await
281    }
282
283    /// Divides the block range into chunks and queries them concurrently, falling back to
284    /// single-block queries for any chunk that fails.
285    async fn get_logs_chunked(&self) -> TransportResult<Vec<Log>> {
286        let FilterBlockOption::Range { from_block, to_block } = self.filter.block_option else {
287            return Err(RpcError::local_usage_str(
288                "chunked queries require a block range filter, not a block hash filter",
289            ));
290        };
291
292        let from =
293            resolve_block_tag(&self.provider, from_block.unwrap_or(BlockNumberOrTag::Earliest))
294                .await?;
295        let to =
296            resolve_block_tag(&self.provider, to_block.unwrap_or(BlockNumberOrTag::Latest)).await?;
297
298        if from > to {
299            return Ok(vec![]);
300        }
301
302        let all_results: Vec<TransportResult<(u64, Vec<Log>)>> =
303            self.chunk_stream(from, to).collect().await;
304
305        let mut resolved: Vec<(u64, Vec<Log>)> =
306            all_results.into_iter().collect::<TransportResult<Vec<_>>>()?;
307
308        resolved.sort_by_key(|(block_num, _)| *block_num);
309        Ok(resolved.into_iter().flat_map(|(_, logs)| logs).collect())
310    }
311
312    /// Returns a stream of per-chunk results over `[from, to]`.
313    ///
314    /// Each item is the result of querying one `chunk_size`-block window, falling back to
315    /// single-block queries if the chunk request fails. Results may arrive out of order due to
316    /// concurrent dispatch; callers are responsible for sorting if order matters.
317    fn chunk_stream(
318        &self,
319        from: u64,
320        to: u64,
321    ) -> impl Stream<Item = TransportResult<(u64, Vec<Log>)>> {
322        let filter = self.filter.clone();
323        let provider = self.provider.clone();
324        let max_concurrent = self.max_concurrent;
325
326        futures::stream::iter(chunk_ranges(from, to, self.chunk_size))
327            .map(move |(start_block, end_block)| {
328                let chunk_filter = filter.clone().from_block(start_block).to_block(end_block);
329                let provider = provider.clone();
330                async move { query_chunk(&provider, &chunk_filter, start_block, end_block).await }
331            })
332            .buffer_unordered(max_concurrent)
333    }
334}
335
336/// Lazily generates `(start, end)` block pairs for each chunk to avoid OOM on huge ranges.
337fn chunk_ranges(from: u64, to: u64, chunk_size: u64) -> impl Iterator<Item = (u64, u64)> {
338    std::iter::successors(Some(from), move |&prev| {
339        let end = prev.saturating_add(chunk_size - 1).min(to);
340        if end >= to {
341            None
342        } else {
343            end.checked_add(1)
344        }
345    })
346    .map(move |start| (start, start.saturating_add(chunk_size - 1).min(to)))
347}
348
349/// Queries a single chunk of blocks, falling back to block-by-block queries if the chunk fails.
350async fn query_chunk<P: Provider<N>, N: Network>(
351    provider: &P,
352    filter: &Filter,
353    start_block: u64,
354    end_block: u64,
355) -> TransportResult<(u64, Vec<Log>)> {
356    match provider.get_logs(filter).await {
357        Ok(logs) => Ok((start_block, logs)),
358        Err(err) => {
359            tracing::debug!(
360                %err,
361                start_block,
362                end_block,
363                "chunk query failed, falling back to single-block queries"
364            );
365            let mut fallback_logs = Vec::new();
366            for block in start_block..=end_block {
367                let single_filter = filter.clone().from_block(block).to_block(block);
368                fallback_logs.extend(provider.get_logs(&single_filter).await?);
369            }
370            Ok((start_block, fallback_logs))
371        }
372    }
373}
374
375/// An event poller.
376///
377/// Polling configuration is available through the [`poller`](Self::poller) field.
378pub struct EventPoller<E> {
379    /// The inner poller.
380    pub poller: FilterPollerBuilder<Log>,
381    _phantom: PhantomData<E>,
382}
383
384impl<E> AsRef<FilterPollerBuilder<Log>> for EventPoller<E> {
385    #[inline]
386    fn as_ref(&self) -> &FilterPollerBuilder<Log> {
387        &self.poller
388    }
389}
390
391impl<E> AsMut<FilterPollerBuilder<Log>> for EventPoller<E> {
392    #[inline]
393    fn as_mut(&mut self) -> &mut FilterPollerBuilder<Log> {
394        &mut self.poller
395    }
396}
397
398impl<E> fmt::Debug for EventPoller<E> {
399    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
400        f.debug_struct("EventPoller")
401            .field("poller", &self.poller)
402            .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
403            .finish()
404    }
405}
406
407impl<E> From<FilterPollerBuilder<Log>> for EventPoller<E> {
408    fn from(poller: FilterPollerBuilder<Log>) -> Self {
409        Self { poller, _phantom: PhantomData }
410    }
411}
412
413impl<E: SolEvent> EventPoller<E> {
414    /// Starts the poller and returns a stream that yields the decoded event and the raw log.
415    ///
416    /// Note that this stream will not return `None` until the provider is dropped.
417    pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
418        self.poller
419            .into_stream()
420            .flat_map(futures_util::stream::iter)
421            .map(|log| decode_log(&log).map(|e| (e, log)))
422    }
423}
424
425/// Resolves a [`BlockNumberOrTag`] to a concrete block number.
426///
427/// Returns `0` for [`BlockNumberOrTag::Earliest`] and fetches the latest block number from the
428/// provider for any other non-numeric tag (e.g. `Latest`, `Pending`, `Safe`, `Finalized`).
429async fn resolve_block_tag<P: Provider<N>, N: Network>(
430    provider: &P,
431    tag: BlockNumberOrTag,
432) -> TransportResult<u64> {
433    match tag.as_number() {
434        Some(n) => Ok(n),
435        None if tag == BlockNumberOrTag::Earliest => Ok(0),
436        None => provider.get_block_number().await,
437    }
438}
439
440fn decode_log<E: SolEvent>(log: &Log) -> alloy_sol_types::Result<E> {
441    let log_data: &LogData = log.as_ref();
442
443    E::decode_raw_log(log_data.topics().iter().copied(), &log_data.data)
444}
445
446#[cfg(feature = "pubsub")]
447pub(crate) mod subscription {
448    use super::*;
449    use alloy_pubsub::Subscription;
450
451    /// An event subscription.
452    ///
453    /// Underlying subscription is available through the [`sub`](Self::sub) field.
454    pub struct EventSubscription<E> {
455        /// The inner poller.
456        pub sub: Subscription<Log>,
457        _phantom: PhantomData<E>,
458    }
459
460    impl<E> AsRef<Subscription<Log>> for EventSubscription<E> {
461        #[inline]
462        fn as_ref(&self) -> &Subscription<Log> {
463            &self.sub
464        }
465    }
466
467    impl<E> AsMut<Subscription<Log>> for EventSubscription<E> {
468        #[inline]
469        fn as_mut(&mut self) -> &mut Subscription<Log> {
470            &mut self.sub
471        }
472    }
473
474    impl<E> fmt::Debug for EventSubscription<E> {
475        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
476            f.debug_struct("EventSubscription")
477                .field("sub", &self.sub)
478                .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
479                .finish()
480        }
481    }
482
483    impl<E> From<Subscription<Log>> for EventSubscription<E> {
484        fn from(sub: Subscription<Log>) -> Self {
485            Self { sub, _phantom: PhantomData }
486        }
487    }
488
489    impl<E: SolEvent> EventSubscription<E> {
490        /// Converts the subscription into a stream.
491        pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
492            self.sub.into_stream().map(|log| decode_log(&log).map(|e| (e, log)))
493        }
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500    use alloy_network::EthereumWallet;
501    use alloy_primitives::U256;
502    use alloy_signer_local::PrivateKeySigner;
503    use alloy_sol_types::sol;
504
505    sol! {
506        // solc v0.8.24; solc a.sol --via-ir --optimize --bin
507        #[sol(rpc, bytecode = "60808060405234601557610147908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c908163299d8665146100a7575063ffdf4f1b14610032575f80fd5b346100a3575f3660031901126100a357602a7f6d10b8446ff0ac11bb95d154e7b10a73042fb9fc3bca0c92de5397b2fe78496c6040518061009e819060608252600560608301526468656c6c6f60d81b608083015263deadbeef604060a0840193600160208201520152565b0390a2005b5f80fd5b346100a3575f3660031901126100a3577f4e4cd44610926680098f1b54e2bdd1fb952659144c471173bbb9cf966af3a988818061009e602a949060608252600560608301526468656c6c6f60d81b608083015263deadbeef604060a084019360016020820152015256fea26469706673582212202e640cd14a7310d4165f902d2721ef5b4640a08f5ae38e9ae5c315a9f9f4435864736f6c63430008190033")]
508        #[allow(dead_code)]
509        contract MyContract {
510            #[derive(Debug, PartialEq, Eq)]
511            event MyEvent(uint64 indexed, string, bool, bytes32);
512
513            #[derive(Debug, PartialEq, Eq)]
514            event WrongEvent(uint64 indexed, string, bool, bytes32);
515
516            function doEmit() external {
517                emit MyEvent(42, "hello", true, bytes32(uint256(0xdeadbeef)));
518            }
519
520            function doEmitWrongEvent() external {
521                emit WrongEvent(42, "hello", true, bytes32(uint256(0xdeadbeef)));
522            }
523        }
524    }
525
526    #[tokio::test]
527    async fn event_filters() {
528        let _ = tracing_subscriber::fmt::try_init();
529
530        let anvil = alloy_node_bindings::Anvil::new().spawn();
531
532        let pk: PrivateKeySigner =
533            "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
534        let wallet = EthereumWallet::from(pk);
535        let provider = alloy_provider::ProviderBuilder::new()
536            .wallet(wallet.clone())
537            .connect_http(anvil.endpoint_url());
538
539        // let from = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266");
540        let contract = MyContract::deploy(&provider).await.unwrap();
541
542        let event: Event<_, MyContract::MyEvent, _> = Event::new(&provider, Filter::new());
543        let all = event.query().await.unwrap();
544        assert_eq!(all.len(), 0);
545
546        // Same as above, but generated by `sol!`.
547        let event = contract.MyEvent_filter();
548
549        let poller = event.watch().await.unwrap();
550
551        let _receipt =
552            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
553
554        let expected_event = MyContract::MyEvent {
555            _0: 42,
556            _1: "hello".to_string(),
557            _2: true,
558            _3: U256::from(0xdeadbeefu64).into(),
559        };
560
561        let mut stream = poller.into_stream();
562        let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
563        assert_eq!(MyContract::MyEvent::SIGNATURE_HASH.0, stream_log.topics().first().unwrap().0); // add check that the received event signature is the same as the one we expect
564        assert_eq!(stream_event, expected_event);
565        assert_eq!(stream_log.inner.address, *contract.address());
566        assert_eq!(stream_log.block_number, Some(2));
567
568        // This is not going to return `None`
569        // assert!(stream.next().await.is_none());
570
571        let all = event.query().await.unwrap();
572        assert_eq!(all.len(), 1);
573        assert_eq!(all[0].0, expected_event);
574        assert_eq!(all[0].1, stream_log);
575
576        // send the wrong event and make sure it is NOT picked up by the event filter
577        let _wrong_receipt = contract
578            .doEmitWrongEvent()
579            .send()
580            .await
581            .unwrap()
582            .get_receipt()
583            .await
584            .expect("no receipt");
585
586        // we sent the wrong event
587        // so no events should be returned when querying event.query() (MyEvent)
588        let all = event.query().await.unwrap();
589        assert_eq!(all.len(), 0);
590
591        #[cfg(feature = "pubsub")]
592        {
593            let provider = alloy_provider::ProviderBuilder::new()
594                .wallet(wallet)
595                .connect(&anvil.ws_endpoint())
596                .await
597                .unwrap();
598
599            let contract = MyContract::new(*contract.address(), provider);
600            let event = contract.MyEvent_filter();
601
602            let sub = event.subscribe().await.unwrap();
603
604            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
605
606            let mut stream = sub.into_stream();
607
608            let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
609            assert_eq!(
610                MyContract::MyEvent::SIGNATURE_HASH.0,
611                stream_log.topics().first().unwrap().0
612            );
613            assert_eq!(stream_event, expected_event);
614            assert_eq!(stream_log.address(), *contract.address());
615            assert_eq!(stream_log.block_number, Some(4));
616
617            // send the request to emit the wrong event
618            contract
619                .doEmitWrongEvent()
620                .send()
621                .await
622                .unwrap()
623                .get_receipt()
624                .await
625                .expect("no receipt");
626
627            // we sent the wrong event
628            // so no events should be returned when querying event.query() (MyEvent)
629            let all = event.query().await.unwrap();
630            assert_eq!(all.len(), 0);
631        }
632    }
633
634    /// Same test as above, but using builder methods.
635    #[tokio::test]
636    async fn event_builder_filters() {
637        let _ = tracing_subscriber::fmt::try_init();
638
639        let anvil = alloy_node_bindings::Anvil::new().spawn();
640        let pk: PrivateKeySigner =
641            "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
642        let wallet = EthereumWallet::from(pk);
643        let provider = alloy_provider::ProviderBuilder::new()
644            .wallet(wallet.clone())
645            .connect_http(anvil.endpoint_url());
646
647        let contract = MyContract::deploy(&provider).await.unwrap();
648
649        let event: Event<_, MyContract::MyEvent, _> = Event::new(&provider, Filter::new())
650            .address(*contract.address())
651            .event_signature(MyContract::MyEvent::SIGNATURE_HASH);
652        let all = event.query().await.unwrap();
653        assert_eq!(all.len(), 0);
654
655        let poller = event.watch().await.unwrap();
656
657        let _receipt =
658            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
659
660        let expected_event = MyContract::MyEvent {
661            _0: 42,
662            _1: "hello".to_string(),
663            _2: true,
664            _3: U256::from(0xdeadbeefu64).into(),
665        };
666
667        let mut stream = poller.into_stream();
668        let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
669        assert_eq!(MyContract::MyEvent::SIGNATURE_HASH.0, stream_log.topics().first().unwrap().0); // add check that the received event signature is the same as the one we expect
670        assert_eq!(stream_event, expected_event);
671        assert_eq!(stream_log.inner.address, *contract.address());
672        assert_eq!(stream_log.block_number, Some(2));
673
674        // This is not going to return `None`
675        // assert!(stream.next().await.is_none());
676
677        let all = event.query().await.unwrap();
678        assert_eq!(all.len(), 1);
679        assert_eq!(all[0].0, expected_event);
680        assert_eq!(all[0].1, stream_log);
681
682        // send the wrong event and make sure it is NOT picked up by the event filter
683        let _wrong_receipt = contract
684            .doEmitWrongEvent()
685            .send()
686            .await
687            .unwrap()
688            .get_receipt()
689            .await
690            .expect("no receipt");
691
692        // we sent the wrong event
693        // so no events should be returned when querying event.query() (MyEvent)
694        let all = event.query().await.unwrap();
695        assert_eq!(all.len(), 0);
696
697        #[cfg(feature = "pubsub")]
698        {
699            let provider = alloy_provider::ProviderBuilder::new()
700                .wallet(wallet)
701                .connect(&anvil.ws_endpoint())
702                .await
703                .unwrap();
704
705            let contract = MyContract::new(*contract.address(), &provider);
706            let event: Event<_, MyContract::MyEvent, _> = Event::new(&provider, Filter::new())
707                .address(*contract.address())
708                .event_signature(MyContract::MyEvent::SIGNATURE_HASH);
709
710            let sub = event.subscribe().await.unwrap();
711
712            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
713
714            let mut stream = sub.into_stream();
715
716            let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
717            assert_eq!(
718                MyContract::MyEvent::SIGNATURE_HASH.0,
719                stream_log.topics().first().unwrap().0
720            );
721            assert_eq!(stream_event, expected_event);
722            assert_eq!(stream_log.address(), *contract.address());
723            assert_eq!(stream_log.block_number, Some(4));
724
725            // send the request to emit the wrong event
726            contract
727                .doEmitWrongEvent()
728                .send()
729                .await
730                .unwrap()
731                .get_receipt()
732                .await
733                .expect("no receipt");
734
735            // we sent the wrong event
736            // so no events should be returned when querying event.query() (MyEvent)
737            let all = event.query().await.unwrap();
738            assert_eq!(all.len(), 0);
739        }
740    }
741
742    /// Verifies that the chunked query algorithm collects the correct logs and preserves
743    /// block ordering when events are spread across a range requiring multiple chunks.
744    #[tokio::test]
745    async fn chunked_query_collects_and_orders_logs() {
746        use alloy_provider::ext::AnvilApi;
747
748        let _ = tracing_subscriber::fmt::try_init();
749
750        let anvil = alloy_node_bindings::Anvil::new().spawn();
751        let pk: PrivateKeySigner =
752            "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
753        let provider = alloy_provider::ProviderBuilder::new()
754            .wallet(EthereumWallet::from(pk))
755            .connect_http(anvil.endpoint_url());
756
757        let contract = MyContract::deploy(&provider).await.unwrap();
758
759        // Emit three events spread across a 100-block range: start, middle, and end.
760        // This exercises chunk boundary handling and result ordering after the merge.
761        contract.doEmit().send().await.unwrap().get_receipt().await.unwrap();
762        provider.anvil_mine(Some(48), None).await.unwrap();
763        contract.doEmit().send().await.unwrap().get_receipt().await.unwrap();
764        provider.anvil_mine(Some(48), None).await.unwrap();
765        contract.doEmit().send().await.unwrap().get_receipt().await.unwrap();
766
767        // Use chunk_size=7 to force ~15 chunks, exercising the concurrent dispatch and merge.
768        // get_logs_chunked() bypasses the optimistic full-range attempt so the chunking code
769        // is always exercised. query_raw() serves as the reference via the optimistic path.
770        let event =
771            contract.MyEvent_filter().from_block(0u64).to_block(100u64).chunked().chunk_size(7);
772
773        let chunked = event.get_logs_chunked().await.unwrap();
774        let reference = event.query_raw().await.unwrap();
775
776        assert_eq!(chunked.len(), 3, "expected exactly 3 events across all chunks");
777        assert_eq!(chunked, reference, "chunked result must match full-range query");
778    }
779}