1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
use crate::{
    utils::{interval, PinBoxFut},
    JsonRpcClient, Middleware, Provider,
};
use ethers_core::types::U256;
use futures_core::stream::Stream;
use futures_util::StreamExt;
use pin_project::pin_project;
use serde::{de::DeserializeOwned, Serialize};
use std::{
    fmt::Debug,
    pin::Pin,
    task::{Context, Poll},
    time::Duration,
    vec::IntoIter,
};

/// The default polling interval for filters and pending transactions
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(7000);

/// The polling interval to use for local endpoints, See [`crate::is_local_endpoint()`]
pub const DEFAULT_LOCAL_POLL_INTERVAL: Duration = Duration::from_millis(100);

enum FilterWatcherState<'a, R> {
    WaitForInterval,
    GetFilterChanges(PinBoxFut<'a, Vec<R>>),
    NextItem(IntoIter<R>),
}

#[must_use = "filters do nothing unless you stream them"]
/// Streams data from an installed filter via `eth_getFilterChanges`
#[pin_project]
pub struct FilterWatcher<'a, P, R> {
    /// The filter's installed id on the ethereum node
    pub id: U256,

    pub(crate) provider: &'a Provider<P>,

    // The polling interval
    interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
    /// statemachine driven by the Stream impl
    state: FilterWatcherState<'a, R>,
}

impl<'a, P, R> FilterWatcher<'a, P, R>
where
    P: JsonRpcClient,
    R: Send + Sync + DeserializeOwned,
{
    /// Creates a new watcher with the provided factory and filter id.
    pub fn new<T: Into<U256>>(id: T, provider: &'a Provider<P>) -> Self {
        Self {
            id: id.into(),
            interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
            state: FilterWatcherState::WaitForInterval,
            provider,
        }
    }

    /// Sets the stream's polling interval
    pub fn interval(mut self, duration: Duration) -> Self {
        self.interval = Box::new(interval(duration));
        self
    }

    /// Alias for Box::pin, must be called in order to pin the stream and be able
    /// to call `next` on it.
    pub fn stream(self) -> Pin<Box<Self>> {
        Box::pin(self)
    }
}

// Advances the filter's state machine
impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
where
    P: JsonRpcClient,
    R: Serialize + Send + Sync + DeserializeOwned + Debug + 'a,
{
    type Item = R;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        let id = *this.id;

        loop {
            *this.state = match &mut this.state {
                FilterWatcherState::WaitForInterval => {
                    // Wait the polling period
                    let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));
                    let fut = Box::pin(this.provider.get_filter_changes(id));
                    FilterWatcherState::GetFilterChanges(fut)
                }
                FilterWatcherState::GetFilterChanges(fut) => {
                    // NOTE: If the provider returns an error, this will return an empty
                    // vector. Should we make this return a Result instead? Ideally if we're
                    // in a streamed loop we wouldn't want the loop to terminate if an error
                    // is encountered (since it might be a temporary error).
                    let items: Vec<R> =
                        futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
                    FilterWatcherState::NextItem(items.into_iter())
                }
                // Consume 1 element from the vector. If more elements are in the vector,
                // the next call will immediately go to this branch instead of trying to get
                // filter changes again. Once the whole vector is consumed, it will poll again
                // for new logs
                FilterWatcherState::NextItem(iter) => {
                    if let item @ Some(_) = iter.next() {
                        cx.waker().wake_by_ref();
                        return Poll::Ready(item)
                    }
                    FilterWatcherState::WaitForInterval
                }
            };
        }
    }
}