ethers_providers/stream/
watcher.rs1use crate::{
2 utils::{interval, PinBoxFut},
3 JsonRpcClient, Middleware, Provider,
4};
5use ethers_core::types::U256;
6use futures_core::stream::Stream;
7use futures_util::StreamExt;
8use pin_project::pin_project;
9use serde::{de::DeserializeOwned, Serialize};
10use std::{
11 fmt::Debug,
12 pin::Pin,
13 task::{Context, Poll},
14 time::Duration,
15 vec::IntoIter,
16};
17
18pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(7000);
20
21pub const DEFAULT_LOCAL_POLL_INTERVAL: Duration = Duration::from_millis(100);
23
24enum FilterWatcherState<'a, R> {
25 WaitForInterval,
26 GetFilterChanges(PinBoxFut<'a, Vec<R>>),
27 NextItem(IntoIter<R>),
28}
29
30#[must_use = "filters do nothing unless you stream them"]
31#[pin_project]
33pub struct FilterWatcher<'a, P, R> {
34 pub id: U256,
36
37 pub(crate) provider: &'a Provider<P>,
38
39 interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
41 state: FilterWatcherState<'a, R>,
43}
44
45impl<'a, P, R> FilterWatcher<'a, P, R>
46where
47 P: JsonRpcClient,
48 R: Send + Sync + DeserializeOwned,
49{
50 pub fn new<T: Into<U256>>(id: T, provider: &'a Provider<P>) -> Self {
52 Self {
53 id: id.into(),
54 interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
55 state: FilterWatcherState::WaitForInterval,
56 provider,
57 }
58 }
59
60 pub fn interval(mut self, duration: Duration) -> Self {
62 self.interval = Box::new(interval(duration));
63 self
64 }
65
66 pub fn stream(self) -> Pin<Box<Self>> {
69 Box::pin(self)
70 }
71}
72
73impl<'a, P, R> Stream for FilterWatcher<'a, P, R>
75where
76 P: JsonRpcClient,
77 R: Serialize + Send + Sync + DeserializeOwned + Debug + 'a,
78{
79 type Item = R;
80
81 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
82 let mut this = self.project();
83 let id = *this.id;
84
85 loop {
86 *this.state = match &mut this.state {
87 FilterWatcherState::WaitForInterval => {
88 let _ready = futures_util::ready!(this.interval.poll_next_unpin(cx));
90 let fut = Box::pin(this.provider.get_filter_changes(id));
91 FilterWatcherState::GetFilterChanges(fut)
92 }
93 FilterWatcherState::GetFilterChanges(fut) => {
94 let items: Vec<R> =
99 futures_util::ready!(fut.as_mut().poll(cx)).unwrap_or_default();
100 FilterWatcherState::NextItem(items.into_iter())
101 }
102 FilterWatcherState::NextItem(iter) => {
107 if let item @ Some(_) = iter.next() {
108 cx.waker().wake_by_ref();
109 return Poll::Ready(item)
110 }
111 FilterWatcherState::WaitForInterval
112 }
113 };
114 }
115 }
116}