1use crate::Error;
2use alloy_network::Ethereum;
3use alloy_primitives::{Address, LogData, B256};
4use alloy_provider::{FilterPollerBuilder, Network, Provider};
5use alloy_rpc_types_eth::{BlockNumberOrTag, Filter, FilterBlockOption, Log, Topic, ValueOrArray};
6use alloy_sol_types::SolEvent;
7use alloy_transport::TransportResult;
8use futures::Stream;
9use futures_util::StreamExt;
10use std::{fmt, marker::PhantomData};
11
12#[must_use = "event filters do nothing unless you `query`, `watch`, or `stream` them"]
14pub struct Event<T, P, E, N = Ethereum> {
15 pub provider: P,
17 pub filter: Filter,
19 _phantom: PhantomData<(T, E, N)>,
20}
21
22impl<T, P: fmt::Debug, E, N> fmt::Debug for Event<T, P, E, N> {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 f.debug_struct("Event")
25 .field("provider", &self.provider)
26 .field("filter", &self.filter)
27 .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
28 .finish()
29 }
30}
31
32#[doc(hidden)]
33impl<'a, T: crate::private::Transport, P: Provider<N>, E: SolEvent, N: Network>
34 Event<T, &'a P, E, N>
35{
36 pub fn new_sol(provider: &'a P, address: &Address) -> Self {
39 if E::ANONYMOUS {
42 Self::new(provider, Filter::new().address(*address))
43 } else {
44 Self::new(provider, Filter::new().address(*address).event_signature(E::SIGNATURE_HASH))
45 }
46 }
47}
48
49impl<T, P: Provider<N>, E: SolEvent, N: Network> Event<T, P, E, N> {
50 pub const fn new(provider: P, filter: Filter) -> Self {
52 Self { provider, filter, _phantom: PhantomData }
53 }
54
55 pub async fn query(&self) -> Result<Vec<(E, Log)>, Error> {
57 let logs = self.query_raw().await?;
58 logs.into_iter().map(|log| Ok((decode_log(&log)?, log))).collect()
59 }
60
61 pub async fn query_raw(&self) -> TransportResult<Vec<Log>> {
64 self.provider.get_logs(&self.filter).await
65 }
66
67 #[doc(alias = "stream")]
71 #[doc(alias = "stream_with_meta")]
72 pub async fn watch(&self) -> TransportResult<EventPoller<E>> {
73 let poller = self.provider.watch_logs(&self.filter).await?;
74 Ok(poller.into())
75 }
76
77 #[cfg(feature = "pubsub")]
81 pub async fn subscribe(&self) -> TransportResult<subscription::EventSubscription<E>> {
82 let sub = self.provider.subscribe_logs(&self.filter).await?;
83 Ok(sub.into())
84 }
85
86 pub fn select(mut self, filter: impl Into<FilterBlockOption>) -> Self {
90 self.filter.block_option = filter.into();
91 self
92 }
93
94 pub fn from_block<B: Into<BlockNumberOrTag>>(mut self, block: B) -> Self {
96 self.filter.block_option = self.filter.block_option.with_from_block(block.into());
97 self
98 }
99
100 pub fn to_block<B: Into<BlockNumberOrTag>>(mut self, block: B) -> Self {
102 self.filter.block_option = self.filter.block_option.with_to_block(block.into());
103 self
104 }
105
106 pub fn is_pending_block_filter(&self) -> bool {
111 self.filter.block_option.get_from_block().is_some_and(BlockNumberOrTag::is_pending)
112 && self.filter.block_option.get_to_block().is_some_and(BlockNumberOrTag::is_pending)
113 }
114
115 pub fn at_block_hash<A: Into<B256>>(mut self, hash: A) -> Self {
117 self.filter.block_option = self.filter.block_option.with_block_hash(hash.into());
118 self
119 }
120
121 pub fn address<A: Into<ValueOrArray<Address>>>(mut self, address: A) -> Self {
125 self.filter.address = address.into().into();
126 self
127 }
128
129 pub fn event(mut self, event_name: &str) -> Self {
131 self.filter = self.filter.event(event_name);
132 self
133 }
134
135 pub fn events(mut self, events: impl IntoIterator<Item = impl AsRef<[u8]>>) -> Self {
137 self.filter = self.filter.events(events);
138 self
139 }
140
141 pub fn event_signature<TO: Into<Topic>>(mut self, topic: TO) -> Self {
143 self.filter.topics[0] = topic.into();
144 self
145 }
146
147 pub fn topic1<TO: Into<Topic>>(mut self, topic: TO) -> Self {
149 self.filter.topics[1] = topic.into();
150 self
151 }
152
153 pub fn topic2<TO: Into<Topic>>(mut self, topic: TO) -> Self {
155 self.filter.topics[2] = topic.into();
156 self
157 }
158
159 pub fn topic3<TO: Into<Topic>>(mut self, topic: TO) -> Self {
161 self.filter.topics[3] = topic.into();
162 self
163 }
164}
165
166impl<T, P: Clone, E, N> Event<T, &P, E, N> {
167 pub fn with_cloned_provider(self) -> Event<T, P, E, N> {
169 Event { provider: self.provider.clone(), filter: self.filter, _phantom: PhantomData }
170 }
171}
172
173pub struct EventPoller<E> {
177 pub poller: FilterPollerBuilder<Log>,
179 _phantom: PhantomData<E>,
180}
181
182impl<E> AsRef<FilterPollerBuilder<Log>> for EventPoller<E> {
183 #[inline]
184 fn as_ref(&self) -> &FilterPollerBuilder<Log> {
185 &self.poller
186 }
187}
188
189impl<E> AsMut<FilterPollerBuilder<Log>> for EventPoller<E> {
190 #[inline]
191 fn as_mut(&mut self) -> &mut FilterPollerBuilder<Log> {
192 &mut self.poller
193 }
194}
195
196impl<E> fmt::Debug for EventPoller<E> {
197 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
198 f.debug_struct("EventPoller")
199 .field("poller", &self.poller)
200 .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
201 .finish()
202 }
203}
204
205impl<E> From<FilterPollerBuilder<Log>> for EventPoller<E> {
206 fn from(poller: FilterPollerBuilder<Log>) -> Self {
207 Self { poller, _phantom: PhantomData }
208 }
209}
210
211impl<E: SolEvent> EventPoller<E> {
212 pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
216 self.poller
217 .into_stream()
218 .flat_map(futures_util::stream::iter)
219 .map(|log| decode_log(&log).map(|e| (e, log)))
220 }
221}
222
223fn decode_log<E: SolEvent>(log: &Log) -> alloy_sol_types::Result<E> {
224 let log_data: &LogData = log.as_ref();
225
226 E::decode_raw_log(log_data.topics().iter().copied(), &log_data.data, false)
227}
228
229#[cfg(feature = "pubsub")]
230pub(crate) mod subscription {
231 use super::*;
232 use alloy_pubsub::Subscription;
233
234 pub struct EventSubscription<E> {
238 pub sub: Subscription<Log>,
240 _phantom: PhantomData<E>,
241 }
242
243 impl<E> AsRef<Subscription<Log>> for EventSubscription<E> {
244 #[inline]
245 fn as_ref(&self) -> &Subscription<Log> {
246 &self.sub
247 }
248 }
249
250 impl<E> AsMut<Subscription<Log>> for EventSubscription<E> {
251 #[inline]
252 fn as_mut(&mut self) -> &mut Subscription<Log> {
253 &mut self.sub
254 }
255 }
256
257 impl<E> fmt::Debug for EventSubscription<E> {
258 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259 f.debug_struct("EventSubscription")
260 .field("sub", &self.sub)
261 .field("event_type", &format_args!("{}", std::any::type_name::<E>()))
262 .finish()
263 }
264 }
265
266 impl<E> From<Subscription<Log>> for EventSubscription<E> {
267 fn from(sub: Subscription<Log>) -> Self {
268 Self { sub, _phantom: PhantomData }
269 }
270 }
271
272 impl<E: SolEvent> EventSubscription<E> {
273 pub fn into_stream(self) -> impl Stream<Item = alloy_sol_types::Result<(E, Log)>> + Unpin {
275 self.sub.into_stream().map(|log| decode_log(&log).map(|e| (e, log)))
276 }
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use alloy_network::EthereumWallet;
284 use alloy_primitives::U256;
285 use alloy_signer_local::PrivateKeySigner;
286 use alloy_sol_types::sol;
287
288 sol! {
289 #[sol(rpc, bytecode = "60808060405234601557610147908161001a8239f35b5f80fdfe6080806040526004361015610012575f80fd5b5f3560e01c908163299d8665146100a7575063ffdf4f1b14610032575f80fd5b346100a3575f3660031901126100a357602a7f6d10b8446ff0ac11bb95d154e7b10a73042fb9fc3bca0c92de5397b2fe78496c6040518061009e819060608252600560608301526468656c6c6f60d81b608083015263deadbeef604060a0840193600160208201520152565b0390a2005b5f80fd5b346100a3575f3660031901126100a3577f4e4cd44610926680098f1b54e2bdd1fb952659144c471173bbb9cf966af3a988818061009e602a949060608252600560608301526468656c6c6f60d81b608083015263deadbeef604060a084019360016020820152015256fea26469706673582212202e640cd14a7310d4165f902d2721ef5b4640a08f5ae38e9ae5c315a9f9f4435864736f6c63430008190033")]
291 #[allow(dead_code)]
292 contract MyContract {
293 #[derive(Debug, PartialEq, Eq)]
294 event MyEvent(uint64 indexed, string, bool, bytes32);
295
296 #[derive(Debug, PartialEq, Eq)]
297 event WrongEvent(uint64 indexed, string, bool, bytes32);
298
299 function doEmit() external {
300 emit MyEvent(42, "hello", true, bytes32(uint256(0xdeadbeef)));
301 }
302
303 function doEmitWrongEvent() external {
304 emit WrongEvent(42, "hello", true, bytes32(uint256(0xdeadbeef)));
305 }
306 }
307 }
308
309 #[tokio::test]
310 async fn event_filters() {
311 let _ = tracing_subscriber::fmt::try_init();
312
313 let anvil = alloy_node_bindings::Anvil::new().spawn();
314
315 let pk: PrivateKeySigner =
316 "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
317 let wallet = EthereumWallet::from(pk);
318 let provider = alloy_provider::ProviderBuilder::new()
319 .wallet(wallet.clone())
320 .on_http(anvil.endpoint_url());
321
322 let contract = MyContract::deploy(&provider).await.unwrap();
324
325 let event: Event<(), _, MyContract::MyEvent, _> = Event::new(&provider, Filter::new());
326 let all = event.query().await.unwrap();
327 assert_eq!(all.len(), 0);
328
329 let event = contract.MyEvent_filter();
331
332 let poller = event.watch().await.unwrap();
333
334 let _receipt =
335 contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
336
337 let expected_event = MyContract::MyEvent {
338 _0: 42,
339 _1: "hello".to_string(),
340 _2: true,
341 _3: U256::from(0xdeadbeefu64).into(),
342 };
343
344 let mut stream = poller.into_stream();
345 let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
346 assert_eq!(MyContract::MyEvent::SIGNATURE_HASH.0, stream_log.topics().first().unwrap().0); assert_eq!(stream_event, expected_event);
348 assert_eq!(stream_log.inner.address, *contract.address());
349 assert_eq!(stream_log.block_number, Some(2));
350
351 let all = event.query().await.unwrap();
355 assert_eq!(all.len(), 1);
356 assert_eq!(all[0].0, expected_event);
357 assert_eq!(all[0].1, stream_log);
358
359 let _wrong_receipt = contract
361 .doEmitWrongEvent()
362 .send()
363 .await
364 .unwrap()
365 .get_receipt()
366 .await
367 .expect("no receipt");
368
369 let all = event.query().await.unwrap();
372 assert_eq!(all.len(), 0);
373
374 #[cfg(feature = "pubsub")]
375 {
376 let provider = alloy_provider::ProviderBuilder::new()
377 .wallet(wallet)
378 .connect(&anvil.ws_endpoint())
379 .await
380 .unwrap();
381
382 let contract = MyContract::new(*contract.address(), provider);
383 let event = contract.MyEvent_filter();
384
385 let sub = event.subscribe().await.unwrap();
386
387 contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
388
389 let mut stream = sub.into_stream();
390
391 let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
392 assert_eq!(
393 MyContract::MyEvent::SIGNATURE_HASH.0,
394 stream_log.topics().first().unwrap().0
395 );
396 assert_eq!(stream_event, expected_event);
397 assert_eq!(stream_log.address(), *contract.address());
398 assert_eq!(stream_log.block_number, Some(4));
399
400 contract
402 .doEmitWrongEvent()
403 .send()
404 .await
405 .unwrap()
406 .get_receipt()
407 .await
408 .expect("no receipt");
409
410 let all = event.query().await.unwrap();
413 assert_eq!(all.len(), 0);
414 }
415 }
416
417 #[tokio::test]
419 async fn event_builder_filters() {
420 let _ = tracing_subscriber::fmt::try_init();
421
422 let anvil = alloy_node_bindings::Anvil::new().spawn();
423 let pk: PrivateKeySigner =
424 "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80".parse().unwrap();
425 let wallet = EthereumWallet::from(pk);
426 let provider = alloy_provider::ProviderBuilder::new()
427 .wallet(wallet.clone())
428 .on_http(anvil.endpoint_url());
429
430 let contract = MyContract::deploy(&provider).await.unwrap();
431
432 let event: Event<(), _, MyContract::MyEvent, _> = Event::new(&provider, Filter::new())
433 .address(*contract.address())
434 .event_signature(MyContract::MyEvent::SIGNATURE_HASH);
435 let all = event.query().await.unwrap();
436 assert_eq!(all.len(), 0);
437
438 let poller = event.watch().await.unwrap();
439
440 let _receipt =
441 contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
442
443 let expected_event = MyContract::MyEvent {
444 _0: 42,
445 _1: "hello".to_string(),
446 _2: true,
447 _3: U256::from(0xdeadbeefu64).into(),
448 };
449
450 let mut stream = poller.into_stream();
451 let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
452 assert_eq!(MyContract::MyEvent::SIGNATURE_HASH.0, stream_log.topics().first().unwrap().0); assert_eq!(stream_event, expected_event);
454 assert_eq!(stream_log.inner.address, *contract.address());
455 assert_eq!(stream_log.block_number, Some(2));
456
457 let all = event.query().await.unwrap();
461 assert_eq!(all.len(), 1);
462 assert_eq!(all[0].0, expected_event);
463 assert_eq!(all[0].1, stream_log);
464
465 let _wrong_receipt = contract
467 .doEmitWrongEvent()
468 .send()
469 .await
470 .unwrap()
471 .get_receipt()
472 .await
473 .expect("no receipt");
474
475 let all = event.query().await.unwrap();
478 assert_eq!(all.len(), 0);
479
480 #[cfg(feature = "pubsub")]
481 {
482 let provider = alloy_provider::ProviderBuilder::new()
483 .wallet(wallet)
484 .connect(&anvil.ws_endpoint())
485 .await
486 .unwrap();
487
488 let contract = MyContract::new(*contract.address(), &provider);
489 let event: Event<(), _, MyContract::MyEvent, _> = Event::new(&provider, Filter::new())
490 .address(*contract.address())
491 .event_signature(MyContract::MyEvent::SIGNATURE_HASH);
492
493 let sub = event.subscribe().await.unwrap();
494
495 contract.doEmit().send().await.unwrap().get_receipt().await.expect("no receipt");
496
497 let mut stream = sub.into_stream();
498
499 let (stream_event, stream_log) = stream.next().await.unwrap().unwrap();
500 assert_eq!(
501 MyContract::MyEvent::SIGNATURE_HASH.0,
502 stream_log.topics().first().unwrap().0
503 );
504 assert_eq!(stream_event, expected_event);
505 assert_eq!(stream_log.address(), *contract.address());
506 assert_eq!(stream_log.block_number, Some(4));
507
508 contract
510 .doEmitWrongEvent()
511 .send()
512 .await
513 .unwrap()
514 .get_receipt()
515 .await
516 .expect("no receipt");
517
518 let all = event.query().await.unwrap();
521 assert_eq!(all.len(), 0);
522 }
523 }
524}