ethers_providers/toolbox/
log_query.rs1use crate::{utils::PinBoxFut, JsonRpcClient, Middleware, Provider, ProviderError};
2use ethers_core::types::{Filter, Log, U64};
3use futures_core::stream::Stream;
4use std::{
5 collections::VecDeque,
6 pin::Pin,
7 task::{Context, Poll},
8};
9use thiserror::Error;
10
11pub struct LogQuery<'a, P> {
15 provider: &'a Provider<P>,
16 filter: Filter,
17 from_block: Option<U64>,
18 page_size: u64,
19 current_logs: VecDeque<Log>,
20 last_block: Option<U64>,
21 state: LogQueryState<'a>,
22}
23
24enum LogQueryState<'a> {
25 Initial,
26 LoadLastBlock(PinBoxFut<'a, U64>),
27 LoadLogs(PinBoxFut<'a, Vec<Log>>),
28 Consume,
29}
30
31impl<'a, P> LogQuery<'a, P>
32where
33 P: JsonRpcClient,
34{
35 pub fn new(provider: &'a Provider<P>, filter: &Filter) -> Self {
37 Self {
38 provider,
39 filter: filter.clone(),
40 from_block: filter.get_from_block(),
41 page_size: 10000,
42 current_logs: VecDeque::new(),
43 last_block: None,
44 state: LogQueryState::Initial,
45 }
46 }
47
48 pub fn with_page_size(mut self, page_size: u64) -> Self {
50 self.page_size = page_size;
51 self
52 }
53}
54
55macro_rules! rewake_with_new_state {
56 ($ctx:ident, $this:ident, $new_state:expr) => {
57 $this.state = $new_state;
58 $ctx.waker().wake_by_ref();
59 return Poll::Pending
60 };
61}
62
63#[derive(Error, Debug)]
65pub enum LogQueryError<E> {
66 #[error(transparent)]
68 LoadLastBlockError(E),
69 #[error(transparent)]
71 LoadLogsError(E),
72}
73
74impl<'a, P> Stream for LogQuery<'a, P>
75where
76 P: JsonRpcClient,
77{
78 type Item = Result<Log, LogQueryError<ProviderError>>;
79
80 fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81 match &mut self.state {
82 LogQueryState::Initial => {
83 if !self.filter.is_paginatable() {
84 let filter = self.filter.clone();
86 let provider = self.provider;
87 #[allow(clippy::redundant_async_block)]
88 let fut = Box::pin(async move { provider.get_logs(&filter).await });
89 rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
90 } else {
91 let fut = self.provider.get_block_number();
93 rewake_with_new_state!(ctx, self, LogQueryState::LoadLastBlock(fut));
94 }
95 }
96 LogQueryState::LoadLastBlock(fut) => {
97 match futures_util::ready!(fut.as_mut().poll(ctx)) {
98 Ok(last_block) => {
99 self.last_block = Some(last_block);
100
101 let from_block = self.filter.get_from_block().unwrap();
104 let to_block = from_block + self.page_size;
105 self.from_block = Some(to_block + 1);
106
107 let filter = self.filter.clone().from_block(from_block).to_block(to_block);
108 let provider = self.provider;
109 #[allow(clippy::redundant_async_block)]
111 let fut = Box::pin(async move { provider.get_logs(&filter).await });
112 rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
113 }
114 Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))),
115 }
116 }
117 LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) {
118 Ok(logs) => {
119 self.current_logs = VecDeque::from(logs);
120 rewake_with_new_state!(ctx, self, LogQueryState::Consume);
121 }
122 Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))),
123 },
124 LogQueryState::Consume => {
125 let log = self.current_logs.pop_front();
126 if log.is_none() {
127 if !self.filter.is_paginatable() {
129 Poll::Ready(None)
130 } else {
131 let from_block = self.from_block.unwrap();
134 let to_block = from_block + self.page_size;
135
136 if from_block > self.last_block.unwrap() {
139 return Poll::Ready(None)
140 }
141 self.from_block = Some(to_block + 1);
143
144 let filter = self.filter.clone().from_block(from_block).to_block(to_block);
145 let provider = self.provider;
146 #[allow(clippy::redundant_async_block)]
147 let fut = Box::pin(async move { provider.get_logs(&filter).await });
148 rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
149 }
150 } else {
151 Poll::Ready(log.map(Ok))
152 }
153 }
154 }
155 }
156}