1use crate::errors::ExecutionError;
5use ethcontract_common::abi::{Topic, TopicFilter};
6use futures::future::{self, TryFutureExt};
7use futures::stream::{self, Stream, TryStreamExt};
8use std::num::NonZeroU64;
9use std::time::Duration;
10use web3::api::Web3;
11use web3::error::Error as Web3Error;
12use web3::types::{Address, BlockId, BlockNumber, Filter, FilterBuilder, Log, H256};
13use web3::Transport;
14
15#[cfg(not(test))]
17pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(5);
18#[cfg(test)]
19pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(0);
20
21pub const DEFAULT_BLOCK_PAGE_SIZE: u64 = 10_000;
23
24#[derive(Debug)]
27#[must_use = "log filter builders do nothing unless you query or stream them"]
28pub struct LogFilterBuilder<T: Transport> {
29 web3: Web3<T>,
31 pub from_block: Option<BlockNumber>,
35 pub to_block: Option<BlockNumber>,
39 pub block_hash: Option<H256>,
41 pub address: Vec<Address>,
43 pub topics: TopicFilter,
45 pub limit: Option<usize>,
49
50 pub block_page_size: Option<NonZeroU64>,
54 pub poll_interval: Option<Duration>,
56}
57
58impl<T: Transport> LogFilterBuilder<T> {
59 pub fn new(web3: Web3<T>) -> Self {
61 LogFilterBuilder {
62 web3,
63 from_block: None,
64 to_block: None,
65 address: Vec::new(),
66 topics: TopicFilter::default(),
67 limit: None,
68 block_page_size: None,
69 poll_interval: None,
70 block_hash: None,
71 }
72 }
73
74 #[allow(clippy::wrong_self_convention)]
78 pub fn from_block(mut self, block: BlockNumber) -> Self {
79 self.from_block = Some(block);
80 self
81 }
82
83 #[allow(clippy::wrong_self_convention)]
87 pub fn to_block(mut self, block: BlockNumber) -> Self {
88 self.to_block = Some(block);
89 self
90 }
91
92 pub fn block_hash(mut self, hash: H256) -> Self {
95 self.block_hash = Some(hash);
96 self
97 }
98
99 pub fn address(mut self, address: Vec<Address>) -> Self {
102 self.address = address;
103 self
104 }
105
106 pub fn topic0(mut self, topic: Topic<H256>) -> Self {
111 self.topics.topic0 = topic;
112 self
113 }
114
115 pub fn topic1(mut self, topic: Topic<H256>) -> Self {
117 self.topics.topic1 = topic;
118 self
119 }
120
121 pub fn topic2(mut self, topic: Topic<H256>) -> Self {
123 self.topics.topic2 = topic;
124 self
125 }
126
127 pub fn topic3(mut self, topic: Topic<H256>) -> Self {
129 self.topics.topic3 = topic;
130 self
131 }
132
133 pub fn limit(mut self, value: usize) -> Self {
137 self.limit = Some(value);
138 self
139 }
140
141 pub fn block_page_size(mut self, value: u64) -> Self {
148 self.block_page_size = Some(NonZeroU64::new(value).expect("block page size cannot be 0"));
149 self
150 }
151
152 pub fn poll_interval(mut self, value: Duration) -> Self {
155 self.poll_interval = Some(value);
156 self
157 }
158
159 pub fn into_filter(self) -> FilterBuilder {
161 let mut filter = FilterBuilder::default();
162 if let Some(from_block) = self.from_block {
163 filter = filter.from_block(from_block);
164 }
165 if let Some(to_block) = self.to_block {
166 filter = filter.to_block(to_block);
167 }
168 if let Some(hash) = self.block_hash {
169 filter = filter.block_hash(hash);
170 }
171 if !self.address.is_empty() {
172 filter = filter.address(self.address);
173 }
174 if self.topics != TopicFilter::default() {
175 filter = filter.topics(
176 topic_to_option(self.topics.topic0),
177 topic_to_option(self.topics.topic1),
178 topic_to_option(self.topics.topic2),
179 topic_to_option(self.topics.topic3),
180 );
181 }
182 if let Some(limit) = self.limit {
183 filter = filter.limit(limit)
184 }
185
186 filter
187 }
188
189 pub async fn past_logs(self) -> Result<Vec<Log>, ExecutionError> {
193 let web3 = self.web3.clone();
194 let filter = self.into_filter();
195 let logs = web3.eth().logs(filter.build()).await?;
196
197 Ok(logs)
198 }
199
200 pub fn past_logs_pages(mut self) -> impl Stream<Item = Result<Vec<Log>, ExecutionError>> {
203 self.limit = None;
206
207 stream::try_unfold(PastLogsStream::Init(self), PastLogsStream::next)
208 .try_filter(|logs| future::ready(!logs.is_empty()))
209 }
210
211 pub fn stream(self) -> impl Stream<Item = Result<Log, ExecutionError>> {
213 let web3 = self.web3.clone();
214 let poll_interval = self.poll_interval.unwrap_or(DEFAULT_POLL_INTERVAL);
215 let filter = self.into_filter();
216
217 async move {
218 let eth_filter = web3
219 .eth_filter()
220 .create_logs_filter(filter.build())
221 .await
222 .map_err(ExecutionError::from)?;
223 let stream = eth_filter
224 .stream(poll_interval)
225 .map_err(ExecutionError::from);
226
227 Ok(stream)
228 }
229 .try_flatten_stream()
230 }
231}
232
233fn topic_to_option(topic: Topic<H256>) -> Option<Vec<H256>> {
235 match topic {
236 Topic::Any => None,
237 Topic::OneOf(v) => Some(v),
238 Topic::This(t) => Some(vec![t]),
239 }
240}
241
242enum PastLogsStream<T: Transport> {
244 Init(LogFilterBuilder<T>),
245 Done,
246 Paging(PastLogsPager<T>),
247 Querying(Web3<T>, Filter),
248}
249
250async fn block_number(
251 web3: &Web3<impl Transport>,
252 block: BlockNumber,
253) -> Result<Option<u64>, Web3Error> {
254 if let BlockNumber::Number(number) = block {
255 return Ok(Some(number.as_u64()));
256 }
257 let block_ = web3.eth().block(BlockId::Number(block)).await?;
258 let Some(block_) = block_ else {
259 return Err(Web3Error::InvalidResponse(format!(
260 "block {block:?} does not exist"
261 )));
262 };
263 Ok(block_.number.map(|n| n.as_u64()))
264}
265
266impl<T: Transport> PastLogsStream<T> {
267 async fn next(mut self) -> Result<Option<(Vec<Log>, Self)>, ExecutionError> {
268 loop {
269 let (logs, next) = match self {
270 PastLogsStream::Init(builder) => {
271 self = PastLogsStream::init(builder).await?;
272 continue;
273 }
274 PastLogsStream::Done => return Ok(None),
275 PastLogsStream::Paging(mut pager) => {
276 let logs = match pager.next_page().await? {
277 Some(logs) => logs,
278 None => return Ok(None),
279 };
280 (logs, PastLogsStream::Paging(pager))
281 }
282 PastLogsStream::Querying(web3, filter) => {
283 let logs = web3.eth().logs(filter.clone()).await?;
284 (logs, PastLogsStream::Done)
285 }
286 };
287 return Ok(Some((logs, next)));
288 }
289 }
290
291 async fn init(builder: LogFilterBuilder<T>) -> Result<Self, ExecutionError> {
292 let from_block = builder.from_block.unwrap_or(BlockNumber::Latest);
293 let to_block = builder.to_block.unwrap_or(BlockNumber::Latest);
294
295 let web3 = builder.web3.clone();
296 let block_page_size = builder
297 .block_page_size
298 .map(|size| size.get())
299 .unwrap_or(DEFAULT_BLOCK_PAGE_SIZE);
300 let filter = builder.into_filter();
301
302 let start_block = match from_block {
303 BlockNumber::Earliest => Some(0),
304 BlockNumber::Number(value) => Some(value.as_u64()),
305 BlockNumber::Latest | BlockNumber::Pending => None,
306 BlockNumber::Safe | BlockNumber::Finalized => block_number(&web3, from_block).await?,
307 };
308 let end_block = match to_block {
309 BlockNumber::Earliest => None,
310 BlockNumber::Number(value) => Some(value.as_u64()),
311 BlockNumber::Latest | BlockNumber::Pending => {
312 let latest_block = web3.eth().block_number().await?;
313 Some(latest_block.as_u64())
314 }
315 BlockNumber::Safe | BlockNumber::Finalized => block_number(&web3, to_block).await?,
316 };
317
318 let next = match (start_block, end_block) {
319 (Some(page_block), Some(end_block)) => PastLogsStream::Paging(PastLogsPager {
320 web3,
321 to_block,
322 block_page_size,
323 filter,
324 page_block,
325 end_block,
326 }),
327 _ => PastLogsStream::Querying(web3, filter.build()),
328 };
329
330 Ok(next)
331 }
332}
333
334struct PastLogsPager<T: Transport> {
336 web3: Web3<T>,
337
338 to_block: BlockNumber,
340 block_page_size: u64,
342 filter: FilterBuilder,
344
345 page_block: u64,
347 end_block: u64,
351}
352
353impl<T: Transport> PastLogsPager<T> {
354 async fn next_page(&mut self) -> Result<Option<Vec<Log>>, ExecutionError> {
355 debug_assert!(
356 self.block_page_size != 0,
357 "pager should never be constructed with 0 block page size",
358 );
359
360 while self.page_block <= self.end_block {
361 let page_end = self.page_block + self.block_page_size - 1;
363 let page_to_block = if page_end < self.end_block {
364 BlockNumber::Number(page_end.into())
365 } else {
366 self.to_block
373 };
374
375 let page = self
376 .web3
377 .eth()
378 .logs(
379 self.filter
380 .clone()
381 .from_block(self.page_block.into())
382 .to_block(page_to_block)
383 .build(),
384 )
385 .await?;
386
387 self.page_block = page_end + 1;
388 if page.is_empty() {
389 continue;
390 }
391
392 return Ok(Some(page));
393 }
394
395 Ok(None)
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402 use crate::test::prelude::*;
403 use futures::stream::StreamExt;
404 use serde_json::Value;
405 use web3::types::U64;
406
407 fn generate_log(kind: &str) -> Value {
408 json!({
409 "address": Address::zero(),
410 "topics": [],
411 "data": "0x",
412 "blockHash": H256::zero(),
413 "blockNumber": "0x0",
414 "transactionHash": H256::zero(),
415 "transactionIndex": "0x0",
416 "logIndex": "0x0",
417 "transactionLogIndex": "0x0",
418 "logType": kind,
419 "removed": false,
420 })
421 }
422
423 #[test]
424 fn past_logs_options() {
425 let mut transport = TestTransport::new();
426 let web3 = Web3::new(transport.clone());
427
428 let address = Address::repeat_byte(0x42);
429 let topics = (0..=3).map(H256::repeat_byte).collect::<Vec<_>>();
430
431 transport.add_response(json!([generate_log("awesome")]));
433
434 let logs = LogFilterBuilder::new(web3)
435 .from_block(66.into())
436 .to_block(BlockNumber::Pending)
437 .address(vec![address])
438 .topic0(Topic::This(topics[0]))
439 .topic1(Topic::This(topics[1]))
440 .topic2(Topic::This(topics[2]))
441 .topic3(Topic::OneOf(vec![topics[3]; 3]))
442 .limit(42)
443 .block_page_size(5) .poll_interval(Duration::from_secs(100)) .past_logs()
446 .immediate()
447 .expect("failed to get past logs");
448
449 assert_eq!(logs[0].log_type.as_deref(), Some("awesome"));
450 transport.assert_request(
451 "eth_getLogs",
452 &[json!({
453 "address": address,
454 "fromBlock": U64::from(66),
455 "toBlock": BlockNumber::Pending,
456 "topics": [
457 topics[0],
458 topics[1],
459 topics[2],
460 vec![topics[3]; 3],
461 ],
462 "limit": 42,
463 })],
464 );
465 transport.assert_no_more_requests();
466 }
467
468 #[test]
469 fn past_log_stream_logs() {
470 let mut transport = TestTransport::new();
471 let web3 = Web3::new(transport.clone());
472
473 let address = Address::repeat_byte(0x42);
474 let topic = H256::repeat_byte(42);
475 let log = generate_log("awesome");
476
477 transport.add_response(json!(U64::from(20)));
479 transport.add_response(json!([log]));
481 transport.add_response(json!([]));
482 transport.add_response(json!([log, log]));
483
484 let mut raw_events = LogFilterBuilder::new(web3)
485 .from_block(10.into())
486 .to_block(BlockNumber::Pending)
487 .address(vec![address])
488 .topic0(Topic::This(topic))
489 .limit(42) .block_page_size(5)
491 .past_logs_pages()
492 .boxed();
493
494 let next = raw_events.next().immediate();
495 assert!(
496 matches!(&next, Some(Ok(logs)) if logs.len() == 1),
497 "expected page length of 1 but got {:?}",
498 next,
499 );
500
501 let next = raw_events.next().immediate();
502 assert!(
503 matches!(&next, Some(Ok(logs)) if logs.len() == 2),
504 "expected page length of 2 but got {:?}",
505 next,
506 );
507
508 let next = raw_events.next().immediate();
509 assert!(
510 next.is_none(),
511 "expected stream to be complete but got {:?}",
512 next,
513 );
514
515 transport.assert_request("eth_blockNumber", &[]);
516 transport.assert_request(
517 "eth_getLogs",
518 &[json!({
519 "address": address,
520 "fromBlock": U64::from(10),
521 "toBlock": U64::from(14),
522 "topics": [topic],
523 })],
524 );
525 transport.assert_request(
526 "eth_getLogs",
527 &[json!({
528 "address": address,
529 "fromBlock": U64::from(15),
530 "toBlock": U64::from(19),
531 "topics": [topic],
532 })],
533 );
534 transport.assert_request(
535 "eth_getLogs",
536 &[json!({
537 "address": address,
538 "fromBlock": U64::from(20),
539 "toBlock": "pending",
540 "topics": [topic],
541 })],
542 );
543 transport.assert_no_more_requests();
544 }
545
546 #[test]
547 fn log_stream_next_log() {
548 let mut transport = TestTransport::new();
549 let web3 = Web3::new(transport.clone());
550
551 transport.add_response(json!("0xf0"));
553 transport.add_response(json!([generate_log("awesome")]));
555
556 let log = LogFilterBuilder::new(web3)
557 .stream()
558 .boxed()
559 .next()
560 .wait()
561 .expect("log stream did not produce any logs")
562 .expect("failed to get log from log stream");
563
564 assert_eq!(log.log_type.as_deref(), Some("awesome"));
565 transport.assert_request("eth_newFilter", &[json!({})]);
566 transport.assert_request("eth_getFilterChanges", &[json!("0xf0")]);
567 transport.assert_no_more_requests();
568 }
569}