ethcontract/
log.rs

1//! This module implements event builders and streams for retrieving events
2//! emitted by a contract.
3
4use crate::errors::ExecutionError;
5use ethcontract_common::abi::{Topic, TopicFilter};
6use futures::future::{self, TryFutureExt};
7use futures::stream::{self, Stream, TryStreamExt};
8use std::num::NonZeroU64;
9use std::time::Duration;
10use web3::api::Web3;
11use web3::error::Error as Web3Error;
12use web3::types::{Address, BlockId, BlockNumber, Filter, FilterBuilder, Log, H256};
13use web3::Transport;
14
15/// The default poll interval to use for polling logs from the block chain.
16#[cfg(not(test))]
17pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(5);
18#[cfg(test)]
19pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(0);
20
21/// The default block page size used for querying past events.
22pub const DEFAULT_BLOCK_PAGE_SIZE: u64 = 10_000;
23
24/// A log filter builder for configuring either a query for past logs or a
25/// stream that constantly queries new logs and deals with re-orgs.
26#[derive(Debug)]
27#[must_use = "log filter builders do nothing unless you query or stream them"]
28pub struct LogFilterBuilder<T: Transport> {
29    /// The underlying web3 provider used for retrieving logs.
30    web3: Web3<T>,
31    /// The block to start streaming logs from.
32    ///
33    /// See [`web3::types::BlockNumber`] for more details on possible values.
34    pub from_block: Option<BlockNumber>,
35    /// The block to stop streaming logs from.
36    ///
37    /// See [`web3::types::BlockNumber`] for more details on possible values.
38    pub to_block: Option<BlockNumber>,
39    /// Block hash, mutually exclusive with pair `from_block` / `to_block`.
40    pub block_hash: Option<H256>,
41    /// The contract addresses to filter logs for.
42    pub address: Vec<Address>,
43    /// Topic filters used for filtering logs based on indexed topics.
44    pub topics: TopicFilter,
45    /// Limit the number of events that can be retrieved by this filter.
46    ///
47    /// Note that this is option is non-standard.
48    pub limit: Option<usize>,
49
50    /// The page size in blocks to use when doing a paginated query on past
51    /// logs. This provides no guarantee in how many logs will be returned per
52    /// page, but used to limit the block range for the query.
53    pub block_page_size: Option<NonZeroU64>,
54    /// The polling interval for querying the node for more logs.
55    pub poll_interval: Option<Duration>,
56}
57
58impl<T: Transport> LogFilterBuilder<T> {
59    /// Creates a new log filter builder from the specified web3 provider.
60    pub fn new(web3: Web3<T>) -> Self {
61        LogFilterBuilder {
62            web3,
63            from_block: None,
64            to_block: None,
65            address: Vec::new(),
66            topics: TopicFilter::default(),
67            limit: None,
68            block_page_size: None,
69            poll_interval: None,
70            block_hash: None,
71        }
72    }
73
74    /// Sets the starting block from which to stream logs for.
75    ///
76    /// If left unset defaults to the latest block.
77    #[allow(clippy::wrong_self_convention)]
78    pub fn from_block(mut self, block: BlockNumber) -> Self {
79        self.from_block = Some(block);
80        self
81    }
82
83    /// Sets the last block from which to stream logs for.
84    ///
85    /// If left unset defaults to the streaming until the end of days.
86    #[allow(clippy::wrong_self_convention)]
87    pub fn to_block(mut self, block: BlockNumber) -> Self {
88        self.to_block = Some(block);
89        self
90    }
91
92    /// Sets `block_hash`. The field `block_hash` and the pair `from_block` and
93    /// `to_block` are mutually exclusive.
94    pub fn block_hash(mut self, hash: H256) -> Self {
95        self.block_hash = Some(hash);
96        self
97    }
98
99    /// Adds an address filter to only retrieve logs that were emitted by a
100    /// contract matching the povided addresses.
101    pub fn address(mut self, address: Vec<Address>) -> Self {
102        self.address = address;
103        self
104    }
105
106    /// Adds a filter for the first indexed topic.
107    ///
108    /// For regular events, this corresponds to the event signature. For
109    /// anonymous events, this is the first indexed property.
110    pub fn topic0(mut self, topic: Topic<H256>) -> Self {
111        self.topics.topic0 = topic;
112        self
113    }
114
115    /// Adds a filter for the second indexed topic.
116    pub fn topic1(mut self, topic: Topic<H256>) -> Self {
117        self.topics.topic1 = topic;
118        self
119    }
120
121    /// Adds a filter for the third indexed topic.
122    pub fn topic2(mut self, topic: Topic<H256>) -> Self {
123        self.topics.topic2 = topic;
124        self
125    }
126
127    /// Adds a filter for the third indexed topic.
128    pub fn topic3(mut self, topic: Topic<H256>) -> Self {
129        self.topics.topic3 = topic;
130        self
131    }
132
133    /// Limit the number of events that can be retrieved by this filter.
134    ///
135    /// Note that this parameter is non-standard.
136    pub fn limit(mut self, value: usize) -> Self {
137        self.limit = Some(value);
138        self
139    }
140
141    /// The page size in blocks to use when doing a paginated query on past
142    /// events.
143    ///
144    /// # Panics
145    ///
146    /// Panics if a block page size of 0 is specified.
147    pub fn block_page_size(mut self, value: u64) -> Self {
148        self.block_page_size = Some(NonZeroU64::new(value).expect("block page size cannot be 0"));
149        self
150    }
151
152    /// The polling interval. This is used as the interval between consecutive
153    /// `eth_getLogs` calls to get log updates.
154    pub fn poll_interval(mut self, value: Duration) -> Self {
155        self.poll_interval = Some(value);
156        self
157    }
158
159    /// Returns a web3 filter builder needed for querying and streaming logs.
160    pub fn into_filter(self) -> FilterBuilder {
161        let mut filter = FilterBuilder::default();
162        if let Some(from_block) = self.from_block {
163            filter = filter.from_block(from_block);
164        }
165        if let Some(to_block) = self.to_block {
166            filter = filter.to_block(to_block);
167        }
168        if let Some(hash) = self.block_hash {
169            filter = filter.block_hash(hash);
170        }
171        if !self.address.is_empty() {
172            filter = filter.address(self.address);
173        }
174        if self.topics != TopicFilter::default() {
175            filter = filter.topics(
176                topic_to_option(self.topics.topic0),
177                topic_to_option(self.topics.topic1),
178                topic_to_option(self.topics.topic2),
179                topic_to_option(self.topics.topic3),
180            );
181        }
182        if let Some(limit) = self.limit {
183            filter = filter.limit(limit)
184        }
185
186        filter
187    }
188
189    /// Performs a `eth_getLogs` query to past logs. For large block ranges,
190    /// such as retrieving all contract logs since genesis, it is recommended to
191    /// use the `past_logs_pages` method instead.
192    pub async fn past_logs(self) -> Result<Vec<Log>, ExecutionError> {
193        let web3 = self.web3.clone();
194        let filter = self.into_filter();
195        let logs = web3.eth().logs(filter.build()).await?;
196
197        Ok(logs)
198    }
199
200    /// Returns a stream that resolves into a page of logs matching the filter
201    /// builder's parameters.
202    pub fn past_logs_pages(mut self) -> impl Stream<Item = Result<Vec<Log>, ExecutionError>> {
203        // NOTE: Ignore the `limit` option when doing paginated queries as it
204        //   can interfere.
205        self.limit = None;
206
207        stream::try_unfold(PastLogsStream::Init(self), PastLogsStream::next)
208            .try_filter(|logs| future::ready(!logs.is_empty()))
209    }
210
211    /// Creates a filter-based log stream that emits logs for each filter change.
212    pub fn stream(self) -> impl Stream<Item = Result<Log, ExecutionError>> {
213        let web3 = self.web3.clone();
214        let poll_interval = self.poll_interval.unwrap_or(DEFAULT_POLL_INTERVAL);
215        let filter = self.into_filter();
216
217        async move {
218            let eth_filter = web3
219                .eth_filter()
220                .create_logs_filter(filter.build())
221                .await
222                .map_err(ExecutionError::from)?;
223            let stream = eth_filter
224                .stream(poll_interval)
225                .map_err(ExecutionError::from);
226
227            Ok(stream)
228        }
229        .try_flatten_stream()
230    }
231}
232
233/// Converts a `Topic` to an equivalent `Option<Vec<T>>`, suitable for `FilterBuilder::topics`
234fn topic_to_option(topic: Topic<H256>) -> Option<Vec<H256>> {
235    match topic {
236        Topic::Any => None,
237        Topic::OneOf(v) => Some(v),
238        Topic::This(t) => Some(vec![t]),
239    }
240}
241
242/// Internal unfold context for creating a `past_logs` `Stream`.
243enum PastLogsStream<T: Transport> {
244    Init(LogFilterBuilder<T>),
245    Done,
246    Paging(PastLogsPager<T>),
247    Querying(Web3<T>, Filter),
248}
249
250async fn block_number(
251    web3: &Web3<impl Transport>,
252    block: BlockNumber,
253) -> Result<Option<u64>, Web3Error> {
254    if let BlockNumber::Number(number) = block {
255        return Ok(Some(number.as_u64()));
256    }
257    let block_ = web3.eth().block(BlockId::Number(block)).await?;
258    let Some(block_) = block_ else {
259        return Err(Web3Error::InvalidResponse(format!(
260            "block {block:?} does not exist"
261        )));
262    };
263    Ok(block_.number.map(|n| n.as_u64()))
264}
265
266impl<T: Transport> PastLogsStream<T> {
267    async fn next(mut self) -> Result<Option<(Vec<Log>, Self)>, ExecutionError> {
268        loop {
269            let (logs, next) = match self {
270                PastLogsStream::Init(builder) => {
271                    self = PastLogsStream::init(builder).await?;
272                    continue;
273                }
274                PastLogsStream::Done => return Ok(None),
275                PastLogsStream::Paging(mut pager) => {
276                    let logs = match pager.next_page().await? {
277                        Some(logs) => logs,
278                        None => return Ok(None),
279                    };
280                    (logs, PastLogsStream::Paging(pager))
281                }
282                PastLogsStream::Querying(web3, filter) => {
283                    let logs = web3.eth().logs(filter.clone()).await?;
284                    (logs, PastLogsStream::Done)
285                }
286            };
287            return Ok(Some((logs, next)));
288        }
289    }
290
291    async fn init(builder: LogFilterBuilder<T>) -> Result<Self, ExecutionError> {
292        let from_block = builder.from_block.unwrap_or(BlockNumber::Latest);
293        let to_block = builder.to_block.unwrap_or(BlockNumber::Latest);
294
295        let web3 = builder.web3.clone();
296        let block_page_size = builder
297            .block_page_size
298            .map(|size| size.get())
299            .unwrap_or(DEFAULT_BLOCK_PAGE_SIZE);
300        let filter = builder.into_filter();
301
302        let start_block = match from_block {
303            BlockNumber::Earliest => Some(0),
304            BlockNumber::Number(value) => Some(value.as_u64()),
305            BlockNumber::Latest | BlockNumber::Pending => None,
306            BlockNumber::Safe | BlockNumber::Finalized => block_number(&web3, from_block).await?,
307        };
308        let end_block = match to_block {
309            BlockNumber::Earliest => None,
310            BlockNumber::Number(value) => Some(value.as_u64()),
311            BlockNumber::Latest | BlockNumber::Pending => {
312                let latest_block = web3.eth().block_number().await?;
313                Some(latest_block.as_u64())
314            }
315            BlockNumber::Safe | BlockNumber::Finalized => block_number(&web3, to_block).await?,
316        };
317
318        let next = match (start_block, end_block) {
319            (Some(page_block), Some(end_block)) => PastLogsStream::Paging(PastLogsPager {
320                web3,
321                to_block,
322                block_page_size,
323                filter,
324                page_block,
325                end_block,
326            }),
327            _ => PastLogsStream::Querying(web3, filter.build()),
328        };
329
330        Ok(next)
331    }
332}
333
334/// Internal state for paging though past logs.
335struct PastLogsPager<T: Transport> {
336    web3: Web3<T>,
337
338    /// The `to_block` specified by the log filter.
339    to_block: BlockNumber,
340    /// The block page size being used for queries.
341    block_page_size: u64,
342    /// The web3 filter used for retrieving the logs.
343    filter: FilterBuilder,
344
345    /// The block number for the next page.
346    page_block: u64,
347    /// The last block used for pagination. This is slightly different than
348    /// `to_block` as this must be a concrete block number (and can't be block
349    /// aliases such as `Earliest` or `Latest`).
350    end_block: u64,
351}
352
353impl<T: Transport> PastLogsPager<T> {
354    async fn next_page(&mut self) -> Result<Option<Vec<Log>>, ExecutionError> {
355        debug_assert!(
356            self.block_page_size != 0,
357            "pager should never be constructed with 0 block page size",
358        );
359
360        while self.page_block <= self.end_block {
361            // NOTE: Log block ranges are inclusive.
362            let page_end = self.page_block + self.block_page_size - 1;
363            let page_to_block = if page_end < self.end_block {
364                BlockNumber::Number(page_end.into())
365            } else {
366                // NOTE: The last page is handled a bit differently by using the
367                //   `to_block` that was originally specified to the builder.
368                //   This is done because the `end_block` is determined when the
369                //   pager is created, and it is possible that the `to_block`
370                //   was specified as "latest" or "pending" which may have
371                //   changed since the paging started.
372                self.to_block
373            };
374
375            let page = self
376                .web3
377                .eth()
378                .logs(
379                    self.filter
380                        .clone()
381                        .from_block(self.page_block.into())
382                        .to_block(page_to_block)
383                        .build(),
384                )
385                .await?;
386
387            self.page_block = page_end + 1;
388            if page.is_empty() {
389                continue;
390            }
391
392            return Ok(Some(page));
393        }
394
395        Ok(None)
396    }
397}
398
399#[cfg(test)]
400mod tests {
401    use super::*;
402    use crate::test::prelude::*;
403    use futures::stream::StreamExt;
404    use serde_json::Value;
405    use web3::types::U64;
406
407    fn generate_log(kind: &str) -> Value {
408        json!({
409            "address": Address::zero(),
410            "topics": [],
411            "data": "0x",
412            "blockHash": H256::zero(),
413            "blockNumber": "0x0",
414            "transactionHash": H256::zero(),
415            "transactionIndex": "0x0",
416            "logIndex": "0x0",
417            "transactionLogIndex": "0x0",
418            "logType": kind,
419            "removed": false,
420        })
421    }
422
423    #[test]
424    fn past_logs_options() {
425        let mut transport = TestTransport::new();
426        let web3 = Web3::new(transport.clone());
427
428        let address = Address::repeat_byte(0x42);
429        let topics = (0..=3).map(H256::repeat_byte).collect::<Vec<_>>();
430
431        // get logs
432        transport.add_response(json!([generate_log("awesome")]));
433
434        let logs = LogFilterBuilder::new(web3)
435            .from_block(66.into())
436            .to_block(BlockNumber::Pending)
437            .address(vec![address])
438            .topic0(Topic::This(topics[0]))
439            .topic1(Topic::This(topics[1]))
440            .topic2(Topic::This(topics[2]))
441            .topic3(Topic::OneOf(vec![topics[3]; 3]))
442            .limit(42)
443            .block_page_size(5) // NOTE: This should get ignored.
444            .poll_interval(Duration::from_secs(100)) // NOTE: This should get ignored.
445            .past_logs()
446            .immediate()
447            .expect("failed to get past logs");
448
449        assert_eq!(logs[0].log_type.as_deref(), Some("awesome"));
450        transport.assert_request(
451            "eth_getLogs",
452            &[json!({
453                "address": address,
454                "fromBlock": U64::from(66),
455                "toBlock": BlockNumber::Pending,
456                "topics": [
457                    topics[0],
458                    topics[1],
459                    topics[2],
460                    vec![topics[3]; 3],
461                ],
462                "limit": 42,
463            })],
464        );
465        transport.assert_no_more_requests();
466    }
467
468    #[test]
469    fn past_log_stream_logs() {
470        let mut transport = TestTransport::new();
471        let web3 = Web3::new(transport.clone());
472
473        let address = Address::repeat_byte(0x42);
474        let topic = H256::repeat_byte(42);
475        let log = generate_log("awesome");
476
477        // get latest block
478        transport.add_response(json!(U64::from(20)));
479        // get logs pages
480        transport.add_response(json!([log]));
481        transport.add_response(json!([]));
482        transport.add_response(json!([log, log]));
483
484        let mut raw_events = LogFilterBuilder::new(web3)
485            .from_block(10.into())
486            .to_block(BlockNumber::Pending)
487            .address(vec![address])
488            .topic0(Topic::This(topic))
489            .limit(42) // NOTE: This should get ignored.
490            .block_page_size(5)
491            .past_logs_pages()
492            .boxed();
493
494        let next = raw_events.next().immediate();
495        assert!(
496            matches!(&next, Some(Ok(logs)) if logs.len() == 1),
497            "expected page length of 1 but got {:?}",
498            next,
499        );
500
501        let next = raw_events.next().immediate();
502        assert!(
503            matches!(&next, Some(Ok(logs)) if logs.len() == 2),
504            "expected page length of 2 but got {:?}",
505            next,
506        );
507
508        let next = raw_events.next().immediate();
509        assert!(
510            next.is_none(),
511            "expected stream to be complete but got {:?}",
512            next,
513        );
514
515        transport.assert_request("eth_blockNumber", &[]);
516        transport.assert_request(
517            "eth_getLogs",
518            &[json!({
519                "address": address,
520                "fromBlock": U64::from(10),
521                "toBlock": U64::from(14),
522                "topics": [topic],
523            })],
524        );
525        transport.assert_request(
526            "eth_getLogs",
527            &[json!({
528                "address": address,
529                "fromBlock": U64::from(15),
530                "toBlock": U64::from(19),
531                "topics": [topic],
532            })],
533        );
534        transport.assert_request(
535            "eth_getLogs",
536            &[json!({
537                "address": address,
538                "fromBlock": U64::from(20),
539                "toBlock": "pending",
540                "topics": [topic],
541            })],
542        );
543        transport.assert_no_more_requests();
544    }
545
546    #[test]
547    fn log_stream_next_log() {
548        let mut transport = TestTransport::new();
549        let web3 = Web3::new(transport.clone());
550
551        // filter created
552        transport.add_response(json!("0xf0"));
553        // get logs filter
554        transport.add_response(json!([generate_log("awesome")]));
555
556        let log = LogFilterBuilder::new(web3)
557            .stream()
558            .boxed()
559            .next()
560            .wait()
561            .expect("log stream did not produce any logs")
562            .expect("failed to get log from log stream");
563
564        assert_eq!(log.log_type.as_deref(), Some("awesome"));
565        transport.assert_request("eth_newFilter", &[json!({})]);
566        transport.assert_request("eth_getFilterChanges", &[json!("0xf0")]);
567        transport.assert_no_more_requests();
568    }
569}