alloy_contract/
event.rs

1use crate::Error;
2use alloy_network::Ethereum;
3use alloy_primitives::{Address, LogData, B256};
4use alloy_provider::{FilterPollerBuilder, Network, Provider};
5use alloy_rpc_types_eth::{BlockNumberOrTag, Filter, FilterBlockOption, Log, Topic, ValueOrArray};
6use alloy_sol_types::SolEvent;
7use alloy_transport::TransportResult;
8use futures::Stream;
9use futures_util::StreamExt;
10use std::{fmt, marker::PhantomData};
11
12/// Helper for managing the event filter before querying or streaming its logs
13#[must_use = "event filters do nothing unless you `query`, `watch`, or `stream` them"]
14pub struct Event<T, P, E, N = Ethereum> {
15    /// The provider to use for querying or streaming logs.
16    pub provider: P,
17    /// The filter to use for querying or streaming logs.
18    pub filter: Filter,
19    _phantom: PhantomData<(T, E, N)>,
20}
21
22impl<T, P: fmt::Debug, E, N> fmt::Debug for Event<T, P, E, N> {
23    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24        f.debug_struct("Event")
25            .field("provider", &self.provider)
26            .field("filter", &self.filter)
27            .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
28            .finish()
29    }
30}
31
32#[doc(hidden)]
33impl<'a, T: crate::private::Transport, P: Provider<N>, E: SolEvent, N: Network>
34    Event<T, &'a P, E, N>
35{
36    // `sol!` macro constructor, see `#[sol(rpc)]`. Not public API.
37    // NOTE: please avoid changing this function due to its use in the `sol!` macro.
38    pub fn new_sol(provider: &'a P, address: &Address) -> Self {
39        // keccak256 hash of the event signature needed for the filter to actually filter by event
40        // check that the event is not anonymous to include the event signature in the filter
41        if E::ANONYMOUS {
42            Self::new(provider, Filter::new().address(*address))
43        } else {
44            Self::new(provider, Filter::new().address(*address).event_signature(E::SIGNATURE_HASH))
45        }
46    }
47}
48
49impl<T, P: Provider<N>, E: SolEvent, N: Network> Event<T, P, E, N> {
50    /// Creates a new event with the provided provider and filter.
51    pub const fn new(provider: P, filter: Filter) -> Self {
52        Self { provider, filter, _phantom: PhantomData }
53    }
54
55    /// Queries the blockchain for the selected filter and returns a vector of matching event logs.
56    pub async fn query(&self) -> Result<Vec<(E, Log)>, Error> {
57        let logs = self.query_raw().await?;
58        logs.into_iter().map(|log| Ok((decode_log(&log)?, log))).collect()
59    }
60
61    /// Queries the blockchain for the selected filter and returns a vector of matching event logs,
62    /// without decoding them.
63    pub async fn query_raw(&self) -> TransportResult<Vec<Log>> {
64        self.provider.get_logs(&self.filter).await
65    }
66
67    /// Watches for events that match the filter.
68    ///
69    /// Returns a stream of decoded events and raw logs.
70    #[doc(alias = "stream")]
71    #[doc(alias = "stream_with_meta")]
72    pub async fn watch(&self) -> TransportResult<EventPoller<E>> {
73        let poller = self.provider.watch_logs(&self.filter).await?;
74        Ok(poller.into())
75    }
76
77    /// Subscribes to the stream of events that match the filter.
78    ///
79    /// Returns a stream of decoded events and raw logs.
80    #[cfg(feature = "pubsub")]
81    pub async fn subscribe(&self) -> TransportResult<subscription::EventSubscription<E>> {
82        let sub = self.provider.subscribe_logs(&self.filter).await?;
83        Ok(sub.into())
84    }
85
86    /// Sets the inner filter object
87    ///
88    /// See [`Filter::select`].
89    pub fn select(mut self, filter: impl Into<FilterBlockOption>) -> Self {
90        self.filter.block_option = filter.into();
91        self
92    }
93
94    /// Sets the from block number
95    pub fn from_block<B: Into<BlockNumberOrTag>>(mut self, block: B) -> Self {
96        self.filter.block_option = self.filter.block_option.with_from_block(block.into());
97        self
98    }
99
100    /// Sets the to block number
101    pub fn to_block<B: Into<BlockNumberOrTag>>(mut self, block: B) -> Self {
102        self.filter.block_option = self.filter.block_option.with_to_block(block.into());
103        self
104    }
105
106    /// Return `true` if filter configured to match pending block.
107    ///
108    /// This means that both `from_block` and `to_block` are set to the pending
109    /// tag.
110    pub fn is_pending_block_filter(&self) -> bool {
111        self.filter.block_option.get_from_block().is_some_and(BlockNumberOrTag::is_pending)
112            && self.filter.block_option.get_to_block().is_some_and(BlockNumberOrTag::is_pending)
113    }
114
115    /// Pins the block hash for the filter
116    pub fn at_block_hash<A: Into<B256>>(mut self, hash: A) -> Self {
117        self.filter.block_option = self.filter.block_option.with_block_hash(hash.into());
118        self
119    }
120
121    /// Sets the address to query with this filter.
122    ///
123    /// See [`Filter::address`].
124    pub fn address<A: Into<ValueOrArray<Address>>>(mut self, address: A) -> Self {
125        self.filter.address = address.into().into();
126        self
127    }
128
129    /// Given the event signature in string form, it hashes it and adds it to the topics to monitor
130    pub fn event(mut self, event_name: &str) -> Self {
131        self.filter = self.filter.event(event_name);
132        self
133    }
134
135    /// Hashes all event signatures and sets them as array to event_signature(topic0)
136    pub fn events(mut self, events: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
137        self.filter = self.filter.events(events);
138        self
139    }
140
141    /// Sets event_signature(topic0) (the event name for non-anonymous events)
142    pub fn event_signature<TO: Into<Topic>>(mut self, topic: TO) -> Self {
143        self.filter.topics[0] = topic.into();
144        self
145    }
146
147    /// Sets the 1st indexed topic
148    pub fn topic1<TO: Into<Topic>>(mut self, topic: TO) -> Self {
149        self.filter.topics[1] = topic.into();
150        self
151    }
152
153    /// Sets the 2nd indexed topic
154    pub fn topic2<TO: Into<Topic>>(mut self, topic: TO) -> Self {
155        self.filter.topics[2] = topic.into();
156        self
157    }
158
159    /// Sets the 3rd indexed topic
160    pub fn topic3<TO: Into<Topic>>(mut self, topic: TO) -> Self {
161        self.filter.topics[3] = topic.into();
162        self
163    }
164}
165
166impl<T, P: Clone, E, N> Event<T, &P, E, N> {
167    /// Clones the provider and returns a new event with the cloned provider.
168    pub fn with_cloned_provider(self) -> Event<T, P, E, N> {
169        Event { provider: self.provider.clone(), filter: self.filter, _phantom: PhantomData }
170    }
171}
172
173/// An event poller.
174///
175/// Polling configuration is available through the [`poller`](Self::poller) field.
176pub struct EventPoller<E> {
177    /// The inner poller.
178    pub poller: FilterPollerBuilder<Log>,
179    _phantom: PhantomData<E>,
180}
181
182impl<E> AsRef<FilterPollerBuilder<Log>> for EventPoller<E> {
183    #[inline]
184    fn as_ref(&self) -> &FilterPollerBuilder<Log> {
185        &self.poller
186    }
187}
188
189impl<E> AsMut<FilterPollerBuilder<Log>> for EventPoller<E> {
190    #[inline]
191    fn as_mut(&mut self) -> &mut FilterPollerBuilder<Log> {
192        &mut self.poller
193    }
194}
195
196impl<E> fmt::Debug for EventPoller<E> {
197    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198        f.debug_struct("EventPoller")
199            .field("poller", &self.poller)
200            .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
201            .finish()
202    }
203}
204
205impl<E> From<FilterPollerBuilder<Log>> for EventPoller<E> {
206    fn from(poller: FilterPollerBuilder<Log>) -> Self {
207        Self { poller, _phantom: PhantomData }
208    }
209}
210
211impl<E: SolEvent> EventPoller<E> {
212    /// Starts the poller and returns a stream that yields the decoded event and the raw log.
213    ///
214    /// Note that this stream will not return `None` until the provider is dropped.
215    pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
216        self.poller
217            .into_stream()
218            .flat_map(futures_util::stream::iter)
219            .map(|log| decode_log(&log).map(|e| (e, log)))
220    }
221}
222
223fn decode_log<E: SolEvent>(log: &Log) -> alloy_sol_types::Result<E> {
224    let log_data: &LogData = log.as_ref();
225
226    E::decode_raw_log(log_data.topics().iter().copied(), &log_data.data, false)
227}
228
229#[cfg(feature = "pubsub")]
230pub(crate) mod subscription {
231    use super::*;
232    use alloy_pubsub::Subscription;
233
234    /// An event subscription.
235    ///
236    /// Underlying subscription is available through the [`sub`](Self::sub) field.
237    pub struct EventSubscription<E> {
238        /// The inner poller.
239        pub sub: Subscription<Log>,
240        _phantom: PhantomData<E>,
241    }
242
243    impl<E> AsRef<Subscription<Log>> for EventSubscription<E> {
244        #[inline]
245        fn as_ref(&self) -> &Subscription<Log> {
246            &self.sub
247        }
248    }
249
250    impl<E> AsMut<Subscription<Log>> for EventSubscription<E> {
251        #[inline]
252        fn as_mut(&mut self) -> &mut Subscription<Log> {
253            &mut self.sub
254        }
255    }
256
257    impl<E> fmt::Debug for EventSubscription<E> {
258        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259            f.debug_struct("EventSubscription")
260                .field("sub", &self.sub)
261                .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
262                .finish()
263        }
264    }
265
266    impl<E> From<Subscription<Log>> for EventSubscription<E> {
267        fn from(sub: Subscription<Log>) -> Self {
268            Self { sub, _phantom: PhantomData }
269        }
270    }
271
272    impl<E: SolEvent> EventSubscription<E> {
273        /// Converts the subscription into a stream.
274        pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
275            self.sub.into_stream().map(|log| decode_log(&log).map(|e| (e, log)))
276        }
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use alloy_network::EthereumWallet;
284    use alloy_primitives::U256;
285    use alloy_signer_local::PrivateKeySigner;
286    use alloy_sol_types::sol;
287
288    sol! {
289        // solc v0.8.24; solc a.sol --via-ir --optimize --bin
290        #[sol(rpc, bytecode = "60808060405234601557610147908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c908163299d8665146100a7575063ffdf4f1b14610032575f80fd5b346100a3575f3660031901126100a357602a7f6d10b8446ff0ac11bb95d154e7b10a73042fb9fc3bca0c92de5397b2fe78496c6040518061009e819060608252600560608301526468656c6c6f60d81b608083015263deadbeef604060a0840193600160208201520152565b0390a2005b5f80fd5b346100a3575f3660031901126100a3577f4e4cd44610926680098f1b54e2bdd1fb952659144c471173bbb9cf966af3a988818061009e602a949060608252600560608301526468656c6c6f60d81b608083015263deadbeef604060a084019360016020820152015256fea26469706673582212202e640cd14a7310d4165f902d2721ef5b4640a08f5ae38e9ae5c315a9f9f4435864736f6c63430008190033")]
291        #[allow(dead_code)]
292        contract MyContract {
293            #[derive(Debug, PartialEq, Eq)]
294            event MyEvent(uint64 indexed, string, bool, bytes32);
295
296            #[derive(Debug, PartialEq, Eq)]
297            event WrongEvent(uint64 indexed, string, bool, bytes32);
298
299            function doEmit() external {
300                emit MyEvent(42, "hello", true, bytes32(uint256(0xdeadbeef)));
301            }
302
303            function doEmitWrongEvent() external {
304                emit WrongEvent(42, "hello", true, bytes32(uint256(0xdeadbeef)));
305            }
306        }
307    }
308
309    #[tokio::test]
310    async fn event_filters() {
311        let _ = tracing_subscriber::fmt::try_init();
312
313        let anvil = alloy_node_bindings::Anvil::new().spawn();
314
315        let pk: PrivateKeySigner =
316            "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
317        let wallet = EthereumWallet::from(pk);
318        let provider = alloy_provider::ProviderBuilder::new()
319            .wallet(wallet.clone())
320            .on_http(anvil.endpoint_url());
321
322        // let from = address!("f39Fd6e51aad88F6F4ce6aB8827279cffFb92266");
323        let contract = MyContract::deploy(&provider).await.unwrap();
324
325        let event: Event<(), _, MyContract::MyEvent, _> = Event::new(&provider, Filter::new());
326        let all = event.query().await.unwrap();
327        assert_eq!(all.len(), 0);
328
329        // Same as above, but generated by `sol!`.
330        let event = contract.MyEvent_filter();
331
332        let poller = event.watch().await.unwrap();
333
334        let _receipt =
335            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
336
337        let expected_event = MyContract::MyEvent {
338            _0: 42,
339            _1: "hello".to_string(),
340            _2: true,
341            _3: U256::from(0xdeadbeefu64).into(),
342        };
343
344        let mut stream = poller.into_stream();
345        let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
346        assert_eq!(MyContract::MyEvent::SIGNATURE_HASH.0, stream_log.topics().first().unwrap().0); // add check that the received event signature is the same as the one we expect
347        assert_eq!(stream_event, expected_event);
348        assert_eq!(stream_log.inner.address, *contract.address());
349        assert_eq!(stream_log.block_number, Some(2));
350
351        // This is not going to return `None`
352        // assert!(stream.next().await.is_none());
353
354        let all = event.query().await.unwrap();
355        assert_eq!(all.len(), 1);
356        assert_eq!(all[0].0, expected_event);
357        assert_eq!(all[0].1, stream_log);
358
359        // send the wrong event and make sure it is NOT picked up by the event filter
360        let _wrong_receipt = contract
361            .doEmitWrongEvent()
362            .send()
363            .await
364            .unwrap()
365            .get_receipt()
366            .await
367            .expect("no receipt");
368
369        // we sent the wrong event
370        // so no events should be returned when querying event.query() (MyEvent)
371        let all = event.query().await.unwrap();
372        assert_eq!(all.len(), 0);
373
374        #[cfg(feature = "pubsub")]
375        {
376            let provider = alloy_provider::ProviderBuilder::new()
377                .wallet(wallet)
378                .connect(&anvil.ws_endpoint())
379                .await
380                .unwrap();
381
382            let contract = MyContract::new(*contract.address(), provider);
383            let event = contract.MyEvent_filter();
384
385            let sub = event.subscribe().await.unwrap();
386
387            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
388
389            let mut stream = sub.into_stream();
390
391            let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
392            assert_eq!(
393                MyContract::MyEvent::SIGNATURE_HASH.0,
394                stream_log.topics().first().unwrap().0
395            );
396            assert_eq!(stream_event, expected_event);
397            assert_eq!(stream_log.address(), *contract.address());
398            assert_eq!(stream_log.block_number, Some(4));
399
400            // send the request to emit the wrong event
401            contract
402                .doEmitWrongEvent()
403                .send()
404                .await
405                .unwrap()
406                .get_receipt()
407                .await
408                .expect("no receipt");
409
410            // we sent the wrong event
411            // so no events should be returned when querying event.query() (MyEvent)
412            let all = event.query().await.unwrap();
413            assert_eq!(all.len(), 0);
414        }
415    }
416
417    /// Same test as above, but using builder methods.
418    #[tokio::test]
419    async fn event_builder_filters() {
420        let _ = tracing_subscriber::fmt::try_init();
421
422        let anvil = alloy_node_bindings::Anvil::new().spawn();
423        let pk: PrivateKeySigner =
424            "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
425        let wallet = EthereumWallet::from(pk);
426        let provider = alloy_provider::ProviderBuilder::new()
427            .wallet(wallet.clone())
428            .on_http(anvil.endpoint_url());
429
430        let contract = MyContract::deploy(&provider).await.unwrap();
431
432        let event: Event<(), _, MyContract::MyEvent, _> = Event::new(&provider, Filter::new())
433            .address(*contract.address())
434            .event_signature(MyContract::MyEvent::SIGNATURE_HASH);
435        let all = event.query().await.unwrap();
436        assert_eq!(all.len(), 0);
437
438        let poller = event.watch().await.unwrap();
439
440        let _receipt =
441            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
442
443        let expected_event = MyContract::MyEvent {
444            _0: 42,
445            _1: "hello".to_string(),
446            _2: true,
447            _3: U256::from(0xdeadbeefu64).into(),
448        };
449
450        let mut stream = poller.into_stream();
451        let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
452        assert_eq!(MyContract::MyEvent::SIGNATURE_HASH.0, stream_log.topics().first().unwrap().0); // add check that the received event signature is the same as the one we expect
453        assert_eq!(stream_event, expected_event);
454        assert_eq!(stream_log.inner.address, *contract.address());
455        assert_eq!(stream_log.block_number, Some(2));
456
457        // This is not going to return `None`
458        // assert!(stream.next().await.is_none());
459
460        let all = event.query().await.unwrap();
461        assert_eq!(all.len(), 1);
462        assert_eq!(all[0].0, expected_event);
463        assert_eq!(all[0].1, stream_log);
464
465        // send the wrong event and make sure it is NOT picked up by the event filter
466        let _wrong_receipt = contract
467            .doEmitWrongEvent()
468            .send()
469            .await
470            .unwrap()
471            .get_receipt()
472            .await
473            .expect("no receipt");
474
475        // we sent the wrong event
476        // so no events should be returned when querying event.query() (MyEvent)
477        let all = event.query().await.unwrap();
478        assert_eq!(all.len(), 0);
479
480        #[cfg(feature = "pubsub")]
481        {
482            let provider = alloy_provider::ProviderBuilder::new()
483                .wallet(wallet)
484                .connect(&anvil.ws_endpoint())
485                .await
486                .unwrap();
487
488            let contract = MyContract::new(*contract.address(), &provider);
489            let event: Event<(), _, MyContract::MyEvent, _> = Event::new(&provider, Filter::new())
490                .address(*contract.address())
491                .event_signature(MyContract::MyEvent::SIGNATURE_HASH);
492
493            let sub = event.subscribe().await.unwrap();
494
495            contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
496
497            let mut stream = sub.into_stream();
498
499            let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
500            assert_eq!(
501                MyContract::MyEvent::SIGNATURE_HASH.0,
502                stream_log.topics().first().unwrap().0
503            );
504            assert_eq!(stream_event, expected_event);
505            assert_eq!(stream_log.address(), *contract.address());
506            assert_eq!(stream_log.block_number, Some(4));
507
508            // send the request to emit the wrong event
509            contract
510                .doEmitWrongEvent()
511                .send()
512                .await
513                .unwrap()
514                .get_receipt()
515                .await
516                .expect("no receipt");
517
518            // we sent the wrong event
519            // so no events should be returned when querying event.query() (MyEvent)
520            let all = event.query().await.unwrap();
521            assert_eq!(all.len(), 0);
522        }
523    }
524}