ethers_providers/stream/
watcher.rs

1use crate::{
2    utils::{interval, PinBoxFut},
3    JsonRpcClient, Middleware, Provider,
4};
5use ethers_core::types::U256;
6use futures_core::stream::Stream;
7use futures_util::StreamExt;
8use pin_project::pin_project;
9use serde::{de::DeserializeOwned, Serialize};
10use std::{
11    fmt::Debug,
12    pin::Pin,
13    task::{Context, Poll},
14    time::Duration,
15    vec::IntoIter,
16};
17
18/// The default polling interval for filters and pending transactions
19pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(7000);
20
21/// The polling interval to use for local endpoints, See [`crate::is_local_endpoint()`]
22pub const DEFAULT_LOCAL_POLL_INTERVAL: Duration = Duration::from_millis(100);
23
24enum FilterWatcherState<'a, R> {
25    WaitForInterval,
26    GetFilterChanges(PinBoxFut<'a, Vec<R>>),
27    NextItem(IntoIter<R>),
28}
29
30#[must_use = "filters do nothing unless you stream them"]
31/// Streams data from an installed filter via `eth_getFilterChanges`
32#[pin_project]
33pub struct FilterWatcher<'a, P, R> {
34    /// The filter's installed id on the ethereum node
35    pub id: U256,
36
37    pub(crate) provider: &'a Provider<P>,
38
39    // The polling interval
40    interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
41    /// statemachine driven by the Stream impl
42    state: FilterWatcherState<'a, R>,
43}
44
45impl<'a, P, R> FilterWatcher<'a, P, R>
46where
47    P: JsonRpcClient,
48    R: Send + Sync + DeserializeOwned,
49{
50    /// Creates a new watcher with the provided factory and filter id.
51    pub fn new<T: Into<U256>>(id: T, provider: &'a Provider<P>) -> Self {
52        Self {
53            id: id.into(),
54            interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
55            state: FilterWatcherState::WaitForInterval,
56            provider,
57        }
58    }
59
60    /// Sets the stream's polling interval
61    pub fn interval(mut self, duration: Duration) -> Self {
62        self.interval = Box::new(interval(duration));
63        self
64    }
65
66    /// Alias for Box::pin, must be called in order to pin the stream and be able
67    /// to call `next` on it.
68    pub fn stream(self) -> Pin<Box<Self>> {
69        Box::pin(self)
70    }
71}
72
73// Advances the filter's state machine
74impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
75where
76    P: JsonRpcClient,
77    R: Serialize + Send + Sync + DeserializeOwned + Debug + 'a,
78{
79    type Item = R;
80
81    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
82        let mut this = self.project();
83        let id = *this.id;
84
85        loop {
86            *this.state = match &mut this.state {
87                FilterWatcherState::WaitForInterval => {
88                    // Wait the polling period
89                    let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));
90                    let fut = Box::pin(this.provider.get_filter_changes(id));
91                    FilterWatcherState::GetFilterChanges(fut)
92                }
93                FilterWatcherState::GetFilterChanges(fut) => {
94                    // NOTE: If the provider returns an error, this will return an empty
95                    // vector. Should we make this return a Result instead? Ideally if we're
96                    // in a streamed loop we wouldn't want the loop to terminate if an error
97                    // is encountered (since it might be a temporary error).
98                    let items: Vec<R> =
99                        futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
100                    FilterWatcherState::NextItem(items.into_iter())
101                }
102                // Consume 1 element from the vector. If more elements are in the vector,
103                // the next call will immediately go to this branch instead of trying to get
104                // filter changes again. Once the whole vector is consumed, it will poll again
105                // for new logs
106                FilterWatcherState::NextItem(iter) => {
107                    if let item @ Some(_) = iter.next() {
108                        cx.waker().wake_by_ref();
109                        return Poll::Ready(item)
110                    }
111                    FilterWatcherState::WaitForInterval
112                }
113            };
114        }
115    }
116}