alloy_rpc_client/
poller.rs

1use crate::WeakClient;
2use alloy_json_rpc::{RpcError, RpcRecv, RpcSend};
3use alloy_transport::utils::Spawnable;
4use async_stream::stream;
5use futures::{Stream, StreamExt};
6use serde::Serialize;
7use serde_json::value::RawValue;
8use std::{
9    borrow::Cow,
10    marker::PhantomData,
11    ops::{Deref, DerefMut},
12    time::Duration,
13};
14use tokio::sync::broadcast;
15use tokio_stream::wrappers::BroadcastStream;
16use tracing_futures::Instrument;
17
18#[cfg(target_arch = "wasm32")]
19use wasmtimer::tokio::sleep;
20
21#[cfg(not(target_arch = "wasm32"))]
22use tokio::time::sleep;
23
24/// The number of retries for polling a request.
25const MAX_RETRIES: usize = 3;
26
27/// A poller task builder.
28///
29/// This builder is used to create a poller task that repeatedly polls a method on a client and
30/// sends the responses to a channel. By default, this is done every 10 seconds, with a channel size
31/// of 16, and no limit on the number of successful polls. This is all configurable.
32///
33/// The builder is consumed using the [`spawn`](Self::spawn) method, which returns a channel to
34/// receive the responses. The task will continue to poll until either the client or the channel is
35/// dropped.
36///
37/// The channel can be converted into a stream using the [`into_stream`](PollChannel::into_stream)
38/// method.
39///
40/// Alternatively, [`into_stream`](Self::into_stream) can be used to directly return a stream of
41/// responses on the current thread. This is currently equivalent to `spawn().into_stream()`, but
42/// this may change in the future.
43///
44/// # Examples
45///
46/// Poll `eth_blockNumber` every 5 seconds:
47///
48/// ```no_run
49/// # async fn example(client: alloy_rpc_client::RpcClient) -> Result<(), Box<dyn std::error::Error>> {
50/// use alloy_primitives::U64;
51/// use alloy_rpc_client::PollerBuilder;
52/// use futures_util::StreamExt;
53///
54/// let poller: PollerBuilder<alloy_rpc_client::NoParams, U64> = client
55///     .prepare_static_poller("eth_blockNumber", [])
56///     .with_poll_interval(std::time::Duration::from_secs(5));
57/// let mut stream = poller.into_stream();
58/// while let Some(block_number) = stream.next().await {
59///    println!("polled block number: {block_number}");
60/// }
61/// # Ok(())
62/// # }
63/// ```
64// TODO: make this be able to be spawned on the current thread instead of forcing a task.
65#[derive(Debug)]
66#[must_use = "this builder does nothing unless you call `spawn` or `into_stream`"]
67pub struct PollerBuilder<Params, Resp> {
68    /// The client to poll with.
69    client: WeakClient,
70
71    /// Request Method
72    method: Cow<'static, str>,
73    params: Params,
74
75    // config options
76    channel_size: usize,
77    poll_interval: Duration,
78    limit: usize,
79
80    _pd: PhantomData<fn() -> Resp>,
81}
82
83impl<Params, Resp> PollerBuilder<Params, Resp>
84where
85    Params: RpcSend + 'static,
86    Resp: RpcRecv + Clone,
87{
88    /// Create a new poller task.
89    pub fn new(client: WeakClient, method: impl Into<Cow<'static, str>>, params: Params) -> Self {
90        let poll_interval =
91            client.upgrade().map_or_else(|| Duration::from_secs(7), |c| c.poll_interval());
92        Self {
93            client,
94            method: method.into(),
95            params,
96            channel_size: 16,
97            poll_interval,
98            limit: usize::MAX,
99            _pd: PhantomData,
100        }
101    }
102
103    /// Returns the channel size for the poller task.
104    pub const fn channel_size(&self) -> usize {
105        self.channel_size
106    }
107
108    /// Sets the channel size for the poller task.
109    pub fn set_channel_size(&mut self, channel_size: usize) {
110        self.channel_size = channel_size;
111    }
112
113    /// Sets the channel size for the poller task.
114    pub fn with_channel_size(mut self, channel_size: usize) -> Self {
115        self.set_channel_size(channel_size);
116        self
117    }
118
119    /// Returns the limit on the number of successful polls.
120    pub const fn limit(&self) -> usize {
121        self.limit
122    }
123
124    /// Sets a limit on the number of successful polls.
125    pub fn set_limit(&mut self, limit: Option<usize>) {
126        self.limit = limit.unwrap_or(usize::MAX);
127    }
128
129    /// Sets a limit on the number of successful polls.
130    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
131        self.set_limit(limit);
132        self
133    }
134
135    /// Returns the duration between polls.
136    pub const fn poll_interval(&self) -> Duration {
137        self.poll_interval
138    }
139
140    /// Sets the duration between polls.
141    pub fn set_poll_interval(&mut self, poll_interval: Duration) {
142        self.poll_interval = poll_interval;
143    }
144
145    /// Sets the duration between polls.
146    pub fn with_poll_interval(mut self, poll_interval: Duration) -> Self {
147        self.set_poll_interval(poll_interval);
148        self
149    }
150
151    /// Starts the poller in a new task, returning a channel to receive the responses on.
152    pub fn spawn(self) -> PollChannel<Resp> {
153        let (tx, rx) = broadcast::channel(self.channel_size);
154        self.into_future(tx).spawn_task();
155        rx.into()
156    }
157
158    async fn into_future(self, tx: broadcast::Sender<Resp>) {
159        let mut stream = self.into_stream();
160        while let Some(resp) = stream.next().await {
161            if tx.send(resp).is_err() {
162                debug!("channel closed");
163                break;
164            }
165        }
166    }
167
168    /// Starts the poller and returns the stream of responses.
169    ///
170    /// Note that this does not spawn the poller on a separate task, thus all responses will be
171    /// polled on the current thread once this stream is polled.
172    pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
173        Box::pin(self.into_local_stream())
174    }
175
176    fn into_local_stream(self) -> impl Stream<Item = Resp> {
177        let span = debug_span!("poller", method = %self.method);
178        stream! {
179        let mut params = ParamsOnce::Typed(self.params);
180        let mut retries = MAX_RETRIES;
181        'outer: for _ in 0..self.limit {
182            let Some(client) = self.client.upgrade() else {
183                debug!("client dropped");
184                break;
185            };
186
187            // Avoid serializing the params more than once.
188            let params = match params.get() {
189                Ok(p) => p,
190                Err(err) => {
191                    error!(%err, "failed to serialize params");
192                    break;
193                }
194            };
195
196            loop {
197                trace!("polling");
198                match client.request(self.method.clone(), params).await {
199                    Ok(resp) => yield resp,
200                    Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
201                        debug!(%err, "failed to poll, retrying");
202                        retries -= 1;
203                        continue;
204                    }
205                    Err(err) => {
206                        error!(%err, "failed to poll");
207                        break 'outer;
208                    }
209                }
210                break;
211            }
212
213            trace!(duration=?self.poll_interval, "sleeping");
214            sleep(self.poll_interval).await;
215        }
216        }
217        .instrument(span)
218    }
219}
220
221/// A channel yielding responses from a poller task.
222///
223/// This stream is backed by a coroutine, and will continue to produce responses
224/// until the poller task is dropped. The poller task is dropped when all
225/// [`RpcClient`] instances are dropped, or when all listening `PollChannel` are
226/// dropped.
227///
228/// The poller task also ignores errors from the server and deserialization
229/// errors, and will continue to poll until the client is dropped.
230///
231/// [`RpcClient`]: crate::RpcClient
232#[derive(Debug)]
233pub struct PollChannel<Resp> {
234    rx: broadcast::Receiver<Resp>,
235}
236
237impl<Resp> From<broadcast::Receiver<Resp>> for PollChannel<Resp> {
238    fn from(rx: broadcast::Receiver<Resp>) -> Self {
239        Self { rx }
240    }
241}
242
243impl<Resp> Deref for PollChannel<Resp> {
244    type Target = broadcast::Receiver<Resp>;
245
246    fn deref(&self) -> &Self::Target {
247        &self.rx
248    }
249}
250
251impl<Resp> DerefMut for PollChannel<Resp> {
252    fn deref_mut(&mut self) -> &mut Self::Target {
253        &mut self.rx
254    }
255}
256
257impl<Resp> PollChannel<Resp>
258where
259    Resp: RpcRecv + Clone,
260{
261    /// Resubscribe to the poller task.
262    pub fn resubscribe(&self) -> Self {
263        Self { rx: self.rx.resubscribe() }
264    }
265
266    /// Converts the poll channel into a stream.
267    // TODO: can we name this type?
268    pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
269        self.into_stream_raw().filter_map(|r| futures::future::ready(r.ok()))
270    }
271
272    /// Converts the poll channel into a stream that also yields
273    /// [lag errors](tokio_stream::wrappers::errors::BroadcastStreamRecvError).
274    pub fn into_stream_raw(self) -> BroadcastStream<Resp> {
275        self.rx.into()
276    }
277}
278
279// Serializes the parameters only once.
280enum ParamsOnce<P> {
281    Typed(P),
282    Serialized(Box<RawValue>),
283}
284
285impl<P: Serialize> ParamsOnce<P> {
286    #[inline]
287    fn get(&mut self) -> serde_json::Result<&RawValue> {
288        match self {
289            Self::Typed(_) => self.init(),
290            Self::Serialized(p) => Ok(p),
291        }
292    }
293
294    #[cold]
295    fn init(&mut self) -> serde_json::Result<&RawValue> {
296        let Self::Typed(p) = self else { unreachable!() };
297        let v = serde_json::value::to_raw_value(p)?;
298        *self = Self::Serialized(v);
299        let Self::Serialized(v) = self else { unreachable!() };
300        Ok(v)
301    }
302}
303
304#[cfg(test)]
305#[allow(clippy::missing_const_for_fn)]
306fn _assert_unpin() {
307    fn _assert<T: Unpin>() {}
308    _assert::<PollChannel<()>>();
309}