alloy_rpc_client/
poller.rs1use crate::WeakClient;
2use alloy_json_rpc::{RpcError, RpcRecv, RpcSend};
3use alloy_transport::utils::Spawnable;
4use async_stream::stream;
5use futures::{Stream, StreamExt};
6use serde::Serialize;
7use serde_json::value::RawValue;
8use std::{
9 borrow::Cow,
10 marker::PhantomData,
11 ops::{Deref, DerefMut},
12 time::Duration,
13};
14use tokio::sync::broadcast;
15use tokio_stream::wrappers::BroadcastStream;
16use tracing_futures::Instrument;
17
18#[cfg(target_arch = "wasm32")]
19use wasmtimer::tokio::sleep;
20
21#[cfg(not(target_arch = "wasm32"))]
22use tokio::time::sleep;
23
24const MAX_RETRIES: usize = 3;
26
27#[derive(Debug)]
66#[must_use = "this builder does nothing unless you call `spawn` or `into_stream`"]
67pub struct PollerBuilder<Params, Resp> {
68 client: WeakClient,
70
71 method: Cow<'static, str>,
73 params: Params,
74
75 channel_size: usize,
77 poll_interval: Duration,
78 limit: usize,
79
80 _pd: PhantomData<fn() -> Resp>,
81}
82
83impl<Params, Resp> PollerBuilder<Params, Resp>
84where
85 Params: RpcSend + 'static,
86 Resp: RpcRecv + Clone,
87{
88 pub fn new(client: WeakClient, method: impl Into<Cow<'static, str>>, params: Params) -> Self {
90 let poll_interval =
91 client.upgrade().map_or_else(|| Duration::from_secs(7), |c| c.poll_interval());
92 Self {
93 client,
94 method: method.into(),
95 params,
96 channel_size: 16,
97 poll_interval,
98 limit: usize::MAX,
99 _pd: PhantomData,
100 }
101 }
102
103 pub const fn channel_size(&self) -> usize {
105 self.channel_size
106 }
107
108 pub fn set_channel_size(&mut self, channel_size: usize) {
110 self.channel_size = channel_size;
111 }
112
113 pub fn with_channel_size(mut self, channel_size: usize) -> Self {
115 self.set_channel_size(channel_size);
116 self
117 }
118
119 pub const fn limit(&self) -> usize {
121 self.limit
122 }
123
124 pub fn set_limit(&mut self, limit: Option<usize>) {
126 self.limit = limit.unwrap_or(usize::MAX);
127 }
128
129 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
131 self.set_limit(limit);
132 self
133 }
134
135 pub const fn poll_interval(&self) -> Duration {
137 self.poll_interval
138 }
139
140 pub fn set_poll_interval(&mut self, poll_interval: Duration) {
142 self.poll_interval = poll_interval;
143 }
144
145 pub fn with_poll_interval(mut self, poll_interval: Duration) -> Self {
147 self.set_poll_interval(poll_interval);
148 self
149 }
150
151 pub fn spawn(self) -> PollChannel<Resp> {
153 let (tx, rx) = broadcast::channel(self.channel_size);
154 self.into_future(tx).spawn_task();
155 rx.into()
156 }
157
158 async fn into_future(self, tx: broadcast::Sender<Resp>) {
159 let mut stream = self.into_stream();
160 while let Some(resp) = stream.next().await {
161 if tx.send(resp).is_err() {
162 debug!("channel closed");
163 break;
164 }
165 }
166 }
167
168 pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
173 Box::pin(self.into_local_stream())
174 }
175
176 fn into_local_stream(self) -> impl Stream<Item = Resp> {
177 let span = debug_span!("poller", method = %self.method);
178 stream! {
179 let mut params = ParamsOnce::Typed(self.params);
180 let mut retries = MAX_RETRIES;
181 'outer: for _ in 0..self.limit {
182 let Some(client) = self.client.upgrade() else {
183 debug!("client dropped");
184 break;
185 };
186
187 let params = match params.get() {
189 Ok(p) => p,
190 Err(err) => {
191 error!(%err, "failed to serialize params");
192 break;
193 }
194 };
195
196 loop {
197 trace!("polling");
198 match client.request(self.method.clone(), params).await {
199 Ok(resp) => yield resp,
200 Err(RpcError::Transport(err)) if retries > 0 && err.recoverable() => {
201 debug!(%err, "failed to poll, retrying");
202 retries -= 1;
203 continue;
204 }
205 Err(err) => {
206 error!(%err, "failed to poll");
207 break 'outer;
208 }
209 }
210 break;
211 }
212
213 trace!(duration=?self.poll_interval, "sleeping");
214 sleep(self.poll_interval).await;
215 }
216 }
217 .instrument(span)
218 }
219}
220
221#[derive(Debug)]
233pub struct PollChannel<Resp> {
234 rx: broadcast::Receiver<Resp>,
235}
236
237impl<Resp> From<broadcast::Receiver<Resp>> for PollChannel<Resp> {
238 fn from(rx: broadcast::Receiver<Resp>) -> Self {
239 Self { rx }
240 }
241}
242
243impl<Resp> Deref for PollChannel<Resp> {
244 type Target = broadcast::Receiver<Resp>;
245
246 fn deref(&self) -> &Self::Target {
247 &self.rx
248 }
249}
250
251impl<Resp> DerefMut for PollChannel<Resp> {
252 fn deref_mut(&mut self) -> &mut Self::Target {
253 &mut self.rx
254 }
255}
256
257impl<Resp> PollChannel<Resp>
258where
259 Resp: RpcRecv + Clone,
260{
261 pub fn resubscribe(&self) -> Self {
263 Self { rx: self.rx.resubscribe() }
264 }
265
266 pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
269 self.into_stream_raw().filter_map(|r| futures::future::ready(r.ok()))
270 }
271
272 pub fn into_stream_raw(self) -> BroadcastStream<Resp> {
275 self.rx.into()
276 }
277}
278
279enum ParamsOnce<P> {
281 Typed(P),
282 Serialized(Box<RawValue>),
283}
284
285impl<P: Serialize> ParamsOnce<P> {
286 #[inline]
287 fn get(&mut self) -> serde_json::Result<&RawValue> {
288 match self {
289 Self::Typed(_) => self.init(),
290 Self::Serialized(p) => Ok(p),
291 }
292 }
293
294 #[cold]
295 fn init(&mut self) -> serde_json::Result<&RawValue> {
296 let Self::Typed(p) = self else { unreachable!() };
297 let v = serde_json::value::to_raw_value(p)?;
298 *self = Self::Serialized(v);
299 let Self::Serialized(v) = self else { unreachable!() };
300 Ok(v)
301 }
302}
303
304#[cfg(test)]
305#[allow(clippy::missing_const_for_fn)]
306fn _assert_unpin() {
307 fn _assert<T: Unpin>() {}
308 _assert::<PollChannel<()>>();
309}