ethers_providers/toolbox/
log_query.rs

1use crate::{utils::PinBoxFut, JsonRpcClient, Middleware, Provider, ProviderError};
2use ethers_core::types::{Filter, Log, U64};
3use futures_core::stream::Stream;
4use std::{
5    collections::VecDeque,
6    pin::Pin,
7    task::{Context, Poll},
8};
9use thiserror::Error;
10
11/// A log query provides streaming access to historical logs via a paginated
12/// request. For streaming access to future logs, use [`Middleware::watch`] or
13/// [`Middleware::subscribe_logs`]
14pub struct LogQuery<'a, P> {
15    provider: &'a Provider<P>,
16    filter: Filter,
17    from_block: Option<U64>,
18    page_size: u64,
19    current_logs: VecDeque<Log>,
20    last_block: Option<U64>,
21    state: LogQueryState<'a>,
22}
23
24enum LogQueryState<'a> {
25    Initial,
26    LoadLastBlock(PinBoxFut<'a, U64>),
27    LoadLogs(PinBoxFut<'a, Vec<Log>>),
28    Consume,
29}
30
31impl<'a, P> LogQuery<'a, P>
32where
33    P: JsonRpcClient,
34{
35    /// Instantiate a new `LogQuery`
36    pub fn new(provider: &'a Provider<P>, filter: &Filter) -> Self {
37        Self {
38            provider,
39            filter: filter.clone(),
40            from_block: filter.get_from_block(),
41            page_size: 10000,
42            current_logs: VecDeque::new(),
43            last_block: None,
44            state: LogQueryState::Initial,
45        }
46    }
47
48    /// set page size for pagination
49    pub fn with_page_size(mut self, page_size: u64) -> Self {
50        self.page_size = page_size;
51        self
52    }
53}
54
55macro_rules! rewake_with_new_state {
56    ($ctx:ident, $this:ident, $new_state:expr) => {
57        $this.state = $new_state;
58        $ctx.waker().wake_by_ref();
59        return Poll::Pending
60    };
61}
62
63/// Errors while querying for logs
64#[derive(Error, Debug)]
65pub enum LogQueryError<E> {
66    /// Error loading latest block
67    #[error(transparent)]
68    LoadLastBlockError(E),
69    /// Error loading logs from block range
70    #[error(transparent)]
71    LoadLogsError(E),
72}
73
74impl<'a, P> Stream for LogQuery<'a, P>
75where
76    P: JsonRpcClient,
77{
78    type Item = Result<Log, LogQueryError<ProviderError>>;
79
80    fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81        match &mut self.state {
82            LogQueryState::Initial => {
83                if !self.filter.is_paginatable() {
84                    // if not paginatable, load logs and consume
85                    let filter = self.filter.clone();
86                    let provider = self.provider;
87                    #[allow(clippy::redundant_async_block)]
88                    let fut = Box::pin(async move { provider.get_logs(&filter).await });
89                    rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
90                } else {
91                    // if paginatable, load last block
92                    let fut = self.provider.get_block_number();
93                    rewake_with_new_state!(ctx, self, LogQueryState::LoadLastBlock(fut));
94                }
95            }
96            LogQueryState::LoadLastBlock(fut) => {
97                match futures_util::ready!(fut.as_mut().poll(ctx)) {
98                    Ok(last_block) => {
99                        self.last_block = Some(last_block);
100
101                        // this is okay because we will only enter this state when the filter is
102                        // paginatable i.e. from block is set
103                        let from_block = self.filter.get_from_block().unwrap();
104                        let to_block = from_block + self.page_size;
105                        self.from_block = Some(to_block + 1);
106
107                        let filter = self.filter.clone().from_block(from_block).to_block(to_block);
108                        let provider = self.provider;
109                        // load first page of logs
110                        #[allow(clippy::redundant_async_block)]
111                        let fut = Box::pin(async move { provider.get_logs(&filter).await });
112                        rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
113                    }
114                    Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))),
115                }
116            }
117            LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) {
118                Ok(logs) => {
119                    self.current_logs = VecDeque::from(logs);
120                    rewake_with_new_state!(ctx, self, LogQueryState::Consume);
121                }
122                Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))),
123            },
124            LogQueryState::Consume => {
125                let log = self.current_logs.pop_front();
126                if log.is_none() {
127                    // consumed all the logs
128                    if !self.filter.is_paginatable() {
129                        Poll::Ready(None)
130                    } else {
131                        // load new logs if there are still more pages to go through
132                        // can safely assume this will always be set in this state
133                        let from_block = self.from_block.unwrap();
134                        let to_block = from_block + self.page_size;
135
136                        // no more pages to load, and everything is consumed
137                        // can safely assume this will always be set in this state
138                        if from_block > self.last_block.unwrap() {
139                            return Poll::Ready(None)
140                        }
141                        // load next page
142                        self.from_block = Some(to_block + 1);
143
144                        let filter = self.filter.clone().from_block(from_block).to_block(to_block);
145                        let provider = self.provider;
146                        #[allow(clippy::redundant_async_block)]
147                        let fut = Box::pin(async move { provider.get_logs(&filter).await });
148                        rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut));
149                    }
150                } else {
151                    Poll::Ready(log.map(Ok))
152                }
153            }
154        }
155    }
156}