alloy_rpc_client/
poller.rs

1use crate::WeakClient;
2use alloy_json_rpc::{RpcRecv, RpcSend};
3use alloy_transport::utils::Spawnable;
4use futures::{ready, stream::FusedStream, Future, FutureExt, Stream, StreamExt};
5use serde::Serialize;
6use serde_json::value::RawValue;
7use std::{
8    borrow::Cow,
9    marker::PhantomData,
10    ops::{Deref, DerefMut},
11    pin::Pin,
12    task::{Context, Poll},
13    time::Duration,
14};
15use tokio::sync::broadcast;
16use tokio_stream::wrappers::BroadcastStream;
17use tracing::Span;
18
19#[cfg(target_family = "wasm")]
20use wasmtimer::tokio::{sleep, Sleep};
21
22#[cfg(not(target_family = "wasm"))]
23use tokio::time::{sleep, Sleep};
24
25/// A poller task builder.
26///
27/// This builder is used to create a poller task that repeatedly polls a method on a client and
28/// sends the responses to a channel. By default, this is done every 10 seconds, with a channel size
29/// of 16, and no limit on the number of successful polls. This is all configurable.
30///
31/// The builder is consumed using the [`spawn`](Self::spawn) method, which returns a channel to
32/// receive the responses. The task will continue to poll until either the client or the channel is
33/// dropped.
34///
35/// The channel can be converted into a stream using the [`into_stream`](PollChannel::into_stream)
36/// method.
37///
38/// Alternatively, [`into_stream`](Self::into_stream) on the builder can be used to directly return
39/// a stream of responses on the current thread, instead of spawning a task.
40///
41/// # Examples
42///
43/// Poll `eth_blockNumber` every 5 seconds:
44///
45/// ```no_run
46/// # async fn example(client: alloy_rpc_client::RpcClient) -> Result<(), Box<dyn std::error::Error>> {
47/// use alloy_primitives::U64;
48/// use alloy_rpc_client::PollerBuilder;
49/// use futures_util::StreamExt;
50///
51/// let poller: PollerBuilder<(), U64> = client
52///     .prepare_static_poller("eth_blockNumber", ())
53///     .with_poll_interval(std::time::Duration::from_secs(5));
54/// let mut stream = poller.into_stream();
55/// while let Some(block_number) = stream.next().await {
56///    println!("polled block number: {block_number}");
57/// }
58/// # Ok(())
59/// # }
60/// ```
61#[derive(Debug)]
62#[must_use = "this builder does nothing unless you call `spawn` or `into_stream`"]
63pub struct PollerBuilder<Params, Resp> {
64    /// The client to poll with.
65    client: WeakClient,
66
67    /// Request Method
68    method: Cow<'static, str>,
69    params: Params,
70
71    // config options
72    channel_size: usize,
73    poll_interval: Duration,
74    limit: usize,
75
76    _pd: PhantomData<fn() -> Resp>,
77}
78
79impl<Params, Resp> PollerBuilder<Params, Resp>
80where
81    Params: RpcSend + 'static,
82    Resp: RpcRecv,
83{
84    /// Create a new poller task.
85    pub fn new(client: WeakClient, method: impl Into<Cow<'static, str>>, params: Params) -> Self {
86        let poll_interval =
87            client.upgrade().map_or_else(|| Duration::from_secs(7), |c| c.poll_interval());
88        Self {
89            client,
90            method: method.into(),
91            params,
92            channel_size: 16,
93            poll_interval,
94            limit: usize::MAX,
95            _pd: PhantomData,
96        }
97    }
98
99    /// Returns the channel size for the poller task.
100    pub const fn channel_size(&self) -> usize {
101        self.channel_size
102    }
103
104    /// Sets the channel size for the poller task.
105    pub const fn set_channel_size(&mut self, channel_size: usize) {
106        self.channel_size = channel_size;
107    }
108
109    /// Sets the channel size for the poller task.
110    pub const fn with_channel_size(mut self, channel_size: usize) -> Self {
111        self.set_channel_size(channel_size);
112        self
113    }
114
115    /// Returns the limit on the number of successful polls.
116    pub const fn limit(&self) -> usize {
117        self.limit
118    }
119
120    /// Sets a limit on the number of successful polls.
121    pub fn set_limit(&mut self, limit: Option<usize>) {
122        self.limit = limit.unwrap_or(usize::MAX);
123    }
124
125    /// Sets a limit on the number of successful polls.
126    pub fn with_limit(mut self, limit: Option<usize>) -> Self {
127        self.set_limit(limit);
128        self
129    }
130
131    /// Returns the duration between polls.
132    pub const fn poll_interval(&self) -> Duration {
133        self.poll_interval
134    }
135
136    /// Sets the duration between polls.
137    pub const fn set_poll_interval(&mut self, poll_interval: Duration) {
138        self.poll_interval = poll_interval;
139    }
140
141    /// Sets the duration between polls.
142    pub const fn with_poll_interval(mut self, poll_interval: Duration) -> Self {
143        self.set_poll_interval(poll_interval);
144        self
145    }
146
147    /// Starts the poller in a new task, returning a channel to receive the responses on.
148    pub fn spawn(self) -> PollChannel<Resp>
149    where
150        Resp: Clone,
151    {
152        let (tx, rx) = broadcast::channel(self.channel_size);
153        self.into_future(tx).spawn_task();
154        rx.into()
155    }
156
157    async fn into_future(self, tx: broadcast::Sender<Resp>)
158    where
159        Resp: Clone,
160    {
161        let mut stream = self.into_stream();
162        while let Some(resp) = stream.next().await {
163            if tx.send(resp).is_err() {
164                debug!("channel closed");
165                break;
166            }
167        }
168    }
169
170    /// Starts the poller and returns the stream of responses.
171    ///
172    /// Note that this does not spawn the poller on a separate task, thus all responses will be
173    /// polled on the current thread once this stream is polled.
174    pub fn into_stream(self) -> PollerStream<Resp> {
175        PollerStream::new(self)
176    }
177
178    /// Returns the [`WeakClient`] associated with the poller.
179    pub fn client(&self) -> WeakClient {
180        self.client.clone()
181    }
182}
183
184/// State for the polling stream.
185enum PollState<Resp> {
186    /// Poller is paused
187    Paused,
188    /// Waiting to start the next poll.
189    Waiting,
190    /// Currently polling for a response.
191    Polling(
192        alloy_transport::Pbf<
193            'static,
194            Resp,
195            alloy_transport::RpcError<alloy_transport::TransportErrorKind>,
196        >,
197    ),
198    /// Sleeping between polls.
199    Sleeping(Pin<Box<Sleep>>),
200
201    /// Polling has finished due to an error.
202    Finished,
203}
204
205/// A stream of responses from polling an RPC method.
206///
207/// This stream polls the given RPC method at the specified interval and yields the responses.
208///
209/// # Examples
210///
211/// ```no_run
212/// # async fn example(client: alloy_rpc_client::RpcClient) -> Result<(), Box<dyn std::error::Error>> {
213/// use alloy_primitives::U64;
214/// use futures_util::StreamExt;
215///
216/// // Create a poller that fetches block numbers
217/// let poller = client
218///     .prepare_static_poller("eth_blockNumber", ())
219///     .with_poll_interval(std::time::Duration::from_secs(1));
220///
221/// // Convert the block number to a more useful format
222/// let mut stream = poller.into_stream().map(|block_num: U64| block_num.to::<u64>());
223///
224/// while let Some(block_number) = stream.next().await {
225///     println!("Current block: {}", block_number);
226/// }
227/// # Ok(())
228/// # }
229/// ```
230pub struct PollerStream<Resp, Output = Resp, Map = fn(Resp) -> Output> {
231    client: WeakClient,
232    method: Cow<'static, str>,
233    params: Box<RawValue>,
234    poll_interval: Duration,
235    limit: usize,
236    poll_count: usize,
237    state: PollState<Resp>,
238    span: Span,
239    map: Map,
240    _pd: PhantomData<fn() -> Output>,
241}
242
243impl<Resp, Output, Map> std::fmt::Debug for PollerStream<Resp, Output, Map> {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        f.debug_struct("PollerStream")
246            .field("method", &self.method)
247            .field("poll_interval", &self.poll_interval)
248            .field("limit", &self.limit)
249            .field("poll_count", &self.poll_count)
250            .finish_non_exhaustive()
251    }
252}
253
254impl<Resp> PollerStream<Resp> {
255    fn new<Params: Serialize>(builder: PollerBuilder<Params, Resp>) -> Self {
256        let span = debug_span!("poller", method = %builder.method);
257
258        // Serialize params once
259        let params = serde_json::value::to_raw_value(&builder.params).unwrap_or_else(|err| {
260            error!(%err, "failed to serialize params during initialization");
261            // Return empty params, stream will terminate on first poll
262            Box::<RawValue>::default()
263        });
264
265        Self {
266            client: builder.client,
267            method: builder.method,
268            params,
269            poll_interval: builder.poll_interval,
270            limit: builder.limit,
271            poll_count: 0,
272            state: PollState::Waiting,
273            span,
274            map: std::convert::identity,
275            _pd: PhantomData,
276        }
277    }
278
279    /// Get a reference to the [`WeakClient`] used by this poller.
280    pub fn client(&self) -> WeakClient {
281        self.client.clone()
282    }
283
284    /// Pauses the poller until it's unpaused.
285    ///
286    /// While paused the poller will not initiate new rpc requests
287    pub fn pause(&mut self) {
288        self.state = PollState::Paused;
289    }
290
291    /// Unpauses the poller.
292    ///
293    /// The poller will initiate new rpc requests once polled.
294    pub fn unpause(&mut self) {
295        if matches!(self.state, PollState::Paused) {
296            self.state = PollState::Waiting;
297        }
298    }
299}
300
301impl<Resp, Output, Map> PollerStream<Resp, Output, Map>
302where
303    Map: Fn(Resp) -> Output,
304{
305    /// Maps the responses using the provided function.
306    pub fn map<NewOutput, NewMap>(self, map: NewMap) -> PollerStream<Resp, NewOutput, NewMap>
307    where
308        NewMap: Fn(Resp) -> NewOutput,
309    {
310        PollerStream {
311            client: self.client,
312            method: self.method,
313            params: self.params,
314            poll_interval: self.poll_interval,
315            limit: self.limit,
316            poll_count: self.poll_count,
317            state: self.state,
318            span: self.span,
319            map,
320            _pd: PhantomData,
321        }
322    }
323}
324
325impl<Resp, Output, Map> Stream for PollerStream<Resp, Output, Map>
326where
327    Resp: RpcRecv + 'static,
328    Map: Fn(Resp) -> Output + Unpin,
329{
330    type Item = Output;
331
332    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333        let this = self.get_mut();
334        let _guard = this.span.enter();
335
336        loop {
337            match &mut this.state {
338                PollState::Paused => return Poll::Pending,
339                PollState::Waiting => {
340                    // Check if we've reached the limit
341                    if this.poll_count >= this.limit {
342                        debug!("poll limit reached");
343                        this.state = PollState::Finished;
344                        continue;
345                    }
346
347                    // Check if client is still alive
348                    let Some(client) = this.client.upgrade() else {
349                        debug!("client dropped");
350                        this.state = PollState::Finished;
351                        continue;
352                    };
353
354                    // Start polling
355                    trace!("polling");
356                    let method = this.method.clone();
357                    let params = this.params.clone();
358                    let fut = Box::pin(async move { client.request(method, params).await });
359                    this.state = PollState::Polling(fut);
360                }
361                PollState::Polling(fut) => {
362                    match ready!(fut.poll_unpin(cx)) {
363                        Ok(resp) => {
364                            this.poll_count += 1;
365                            // Start sleeping before next poll
366                            trace!(duration=?this.poll_interval, "sleeping");
367                            let sleep = Box::pin(sleep(this.poll_interval));
368                            this.state = PollState::Sleeping(sleep);
369                            return Poll::Ready(Some((this.map)(resp)));
370                        }
371                        Err(err) => {
372                            error!(%err, "failed to poll");
373
374                            // If the error is a filter not found error, stop
375                            // the poller. Error codes are not consistent
376                            // across reth/geth/nethermind, so we check
377                            // just the message.
378                            if let Some(resp) = err.as_error_resp() {
379                                if resp.message.contains("filter not found") {
380                                    warn!("server has dropped the filter, stopping poller");
381                                    this.state = PollState::Finished;
382                                    continue;
383                                }
384                            }
385
386                            // Start sleeping before retry
387                            trace!(duration=?this.poll_interval, "sleeping after error");
388
389                            let sleep = Box::pin(sleep(this.poll_interval));
390                            this.state = PollState::Sleeping(sleep);
391                        }
392                    }
393                }
394                PollState::Sleeping(sleep) => {
395                    ready!(sleep.as_mut().poll(cx));
396                    this.state = PollState::Waiting;
397                }
398                PollState::Finished => {
399                    return Poll::Ready(None);
400                }
401            }
402        }
403    }
404}
405
406impl<Resp, Output, Map> FusedStream for PollerStream<Resp, Output, Map>
407where
408    Resp: RpcRecv + 'static,
409    Map: Fn(Resp) -> Output + Unpin,
410{
411    fn is_terminated(&self) -> bool {
412        matches!(self.state, PollState::Finished)
413    }
414}
415
416/// A channel yielding responses from a poller task.
417///
418/// This stream is backed by a coroutine, and will continue to produce responses
419/// until the poller task is dropped. The poller task is dropped when all
420/// [`RpcClient`] instances are dropped, or when all listening `PollChannel` are
421/// dropped.
422///
423/// The poller task also ignores errors from the server and deserialization
424/// errors, and will continue to poll until the client is dropped.
425///
426/// [`RpcClient`]: crate::RpcClient
427#[derive(Debug)]
428pub struct PollChannel<Resp> {
429    rx: broadcast::Receiver<Resp>,
430}
431
432impl<Resp> From<broadcast::Receiver<Resp>> for PollChannel<Resp> {
433    fn from(rx: broadcast::Receiver<Resp>) -> Self {
434        Self { rx }
435    }
436}
437
438impl<Resp> Deref for PollChannel<Resp> {
439    type Target = broadcast::Receiver<Resp>;
440
441    fn deref(&self) -> &Self::Target {
442        &self.rx
443    }
444}
445
446impl<Resp> DerefMut for PollChannel<Resp> {
447    fn deref_mut(&mut self) -> &mut Self::Target {
448        &mut self.rx
449    }
450}
451
452impl<Resp> PollChannel<Resp>
453where
454    Resp: RpcRecv + Clone,
455{
456    /// Resubscribe to the poller task.
457    pub fn resubscribe(&self) -> Self {
458        Self { rx: self.rx.resubscribe() }
459    }
460
461    /// Converts the poll channel into a stream.
462    pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
463        self.into_stream_raw().filter_map(|r| futures::future::ready(r.ok()))
464    }
465
466    /// Converts the poll channel into a stream that also yields
467    /// [lag errors](tokio_stream::wrappers::errors::BroadcastStreamRecvError).
468    pub fn into_stream_raw(self) -> BroadcastStream<Resp> {
469        self.rx.into()
470    }
471}
472
473#[cfg(test)]
474#[allow(clippy::missing_const_for_fn)]
475fn _assert_unpin() {
476    fn _assert<T: Unpin>() {}
477    _assert::<PollChannel<()>>();
478}