1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
use crate::{JsonRpcClient, Middleware, PinBoxFut, Provider};
use ethers_core::types::U256;
use futures_core::stream::Stream;
use futures_timer::Delay;
use futures_util::{stream, FutureExt, StreamExt};
use pin_project::pin_project;
use serde::{de::DeserializeOwned, Serialize};
use std::{
fmt::Debug,
pin::Pin,
task::{Context, Poll},
time::Duration,
vec::IntoIter,
};
pub fn interval(duration: Duration) -> impl Stream<Item = ()> + Send + Unpin {
stream::unfold((), move |_| Delay::new(duration).map(|_| Some(((), ())))).map(drop)
}
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(7000);
enum FilterWatcherState<'a, R> {
WaitForInterval,
GetFilterChanges(PinBoxFut<'a, Vec<R>>),
NextItem(IntoIter<R>),
}
#[must_use = "filters do nothing unless you stream them"]
#[pin_project]
pub struct FilterWatcher<'a, P, R> {
pub id: U256,
provider: &'a Provider<P>,
interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
state: FilterWatcherState<'a, R>,
}
impl<'a, P, R> FilterWatcher<'a, P, R>
where
P: JsonRpcClient,
R: Send + Sync + DeserializeOwned,
{
pub fn new<T: Into<U256>>(id: T, provider: &'a Provider<P>) -> Self {
Self {
id: id.into(),
interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
state: FilterWatcherState::WaitForInterval,
provider,
}
}
pub fn interval(mut self, duration: Duration) -> Self {
self.interval = Box::new(interval(duration));
self
}
pub fn stream(self) -> Pin<Box<Self>> {
Box::pin(self)
}
}
impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
where
P: JsonRpcClient,
R: Serialize + Send + Sync + DeserializeOwned + Debug + 'a,
{
type Item = R;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let id = *this.id;
*this.state = match this.state {
FilterWatcherState::WaitForInterval => {
let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));
cx.waker().wake_by_ref();
let fut = Box::pin(this.provider.get_filter_changes(id));
FilterWatcherState::GetFilterChanges(fut)
}
FilterWatcherState::GetFilterChanges(fut) => {
let items: Vec<R> = futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
cx.waker().wake_by_ref();
FilterWatcherState::NextItem(items.into_iter())
}
FilterWatcherState::NextItem(iter) => {
cx.waker().wake_by_ref();
match iter.next() {
Some(item) => return Poll::Ready(Some(item)),
None => FilterWatcherState::WaitForInterval,
}
}
};
Poll::Pending
}
}