1use crate::WeakClient;
2use alloy_json_rpc::{RpcRecv, RpcSend};
3use alloy_transport::utils::Spawnable;
4use futures::{ready, stream::FusedStream, Future, FutureExt, Stream, StreamExt};
5use serde::Serialize;
6use serde_json::value::RawValue;
7use std::{
8 borrow::Cow,
9 marker::PhantomData,
10 ops::{Deref, DerefMut},
11 pin::Pin,
12 task::{Context, Poll},
13 time::Duration,
14};
15use tokio::sync::broadcast;
16use tokio_stream::wrappers::BroadcastStream;
17use tracing::Span;
18
19#[cfg(target_family = "wasm")]
20use wasmtimer::tokio::{sleep, Sleep};
21
22#[cfg(not(target_family = "wasm"))]
23use tokio::time::{sleep, Sleep};
24
25#[derive(Debug)]
62#[must_use = "this builder does nothing unless you call `spawn` or `into_stream`"]
63pub struct PollerBuilder<Params, Resp> {
64 client: WeakClient,
66
67 method: Cow<'static, str>,
69 params: Params,
70
71 channel_size: usize,
73 poll_interval: Duration,
74 limit: usize,
75
76 _pd: PhantomData<fn() -> Resp>,
77}
78
79impl<Params, Resp> PollerBuilder<Params, Resp>
80where
81 Params: RpcSend + 'static,
82 Resp: RpcRecv,
83{
84 pub fn new(client: WeakClient, method: impl Into<Cow<'static, str>>, params: Params) -> Self {
86 let poll_interval =
87 client.upgrade().map_or_else(|| Duration::from_secs(7), |c| c.poll_interval());
88 Self {
89 client,
90 method: method.into(),
91 params,
92 channel_size: 16,
93 poll_interval,
94 limit: usize::MAX,
95 _pd: PhantomData,
96 }
97 }
98
99 pub const fn channel_size(&self) -> usize {
101 self.channel_size
102 }
103
104 pub const fn set_channel_size(&mut self, channel_size: usize) {
106 self.channel_size = channel_size;
107 }
108
109 pub const fn with_channel_size(mut self, channel_size: usize) -> Self {
111 self.set_channel_size(channel_size);
112 self
113 }
114
115 pub const fn limit(&self) -> usize {
117 self.limit
118 }
119
120 pub fn set_limit(&mut self, limit: Option<usize>) {
122 self.limit = limit.unwrap_or(usize::MAX);
123 }
124
125 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
127 self.set_limit(limit);
128 self
129 }
130
131 pub const fn poll_interval(&self) -> Duration {
133 self.poll_interval
134 }
135
136 pub const fn set_poll_interval(&mut self, poll_interval: Duration) {
138 self.poll_interval = poll_interval;
139 }
140
141 pub const fn with_poll_interval(mut self, poll_interval: Duration) -> Self {
143 self.set_poll_interval(poll_interval);
144 self
145 }
146
147 pub fn spawn(self) -> PollChannel<Resp>
149 where
150 Resp: Clone,
151 {
152 let (tx, rx) = broadcast::channel(self.channel_size);
153 self.into_future(tx).spawn_task();
154 rx.into()
155 }
156
157 async fn into_future(self, tx: broadcast::Sender<Resp>)
158 where
159 Resp: Clone,
160 {
161 let mut stream = self.into_stream();
162 while let Some(resp) = stream.next().await {
163 if tx.send(resp).is_err() {
164 debug!("channel closed");
165 break;
166 }
167 }
168 }
169
170 pub fn into_stream(self) -> PollerStream<Resp> {
175 PollerStream::new(self)
176 }
177
178 pub fn client(&self) -> WeakClient {
180 self.client.clone()
181 }
182}
183
184enum PollState<Resp> {
186 Paused,
188 Waiting,
190 Polling(
192 alloy_transport::Pbf<
193 'static,
194 Resp,
195 alloy_transport::RpcError<alloy_transport::TransportErrorKind>,
196 >,
197 ),
198 Sleeping(Pin<Box<Sleep>>),
200
201 Finished,
203}
204
205pub struct PollerStream<Resp, Output = Resp, Map = fn(Resp) -> Output> {
231 client: WeakClient,
232 method: Cow<'static, str>,
233 params: Box<RawValue>,
234 poll_interval: Duration,
235 limit: usize,
236 poll_count: usize,
237 state: PollState<Resp>,
238 span: Span,
239 map: Map,
240 _pd: PhantomData<fn() -> Output>,
241}
242
243impl<Resp, Output, Map> std::fmt::Debug for PollerStream<Resp, Output, Map> {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 f.debug_struct("PollerStream")
246 .field("method", &self.method)
247 .field("poll_interval", &self.poll_interval)
248 .field("limit", &self.limit)
249 .field("poll_count", &self.poll_count)
250 .finish_non_exhaustive()
251 }
252}
253
254impl<Resp> PollerStream<Resp> {
255 fn new<Params: Serialize>(builder: PollerBuilder<Params, Resp>) -> Self {
256 let span = debug_span!("poller", method = %builder.method);
257
258 let params = serde_json::value::to_raw_value(&builder.params).unwrap_or_else(|err| {
260 error!(%err, "failed to serialize params during initialization");
261 Box::<RawValue>::default()
263 });
264
265 Self {
266 client: builder.client,
267 method: builder.method,
268 params,
269 poll_interval: builder.poll_interval,
270 limit: builder.limit,
271 poll_count: 0,
272 state: PollState::Waiting,
273 span,
274 map: std::convert::identity,
275 _pd: PhantomData,
276 }
277 }
278
279 pub fn client(&self) -> WeakClient {
281 self.client.clone()
282 }
283
284 pub fn pause(&mut self) {
288 self.state = PollState::Paused;
289 }
290
291 pub fn unpause(&mut self) {
295 if matches!(self.state, PollState::Paused) {
296 self.state = PollState::Waiting;
297 }
298 }
299}
300
301impl<Resp, Output, Map> PollerStream<Resp, Output, Map>
302where
303 Map: Fn(Resp) -> Output,
304{
305 pub fn map<NewOutput, NewMap>(self, map: NewMap) -> PollerStream<Resp, NewOutput, NewMap>
307 where
308 NewMap: Fn(Resp) -> NewOutput,
309 {
310 PollerStream {
311 client: self.client,
312 method: self.method,
313 params: self.params,
314 poll_interval: self.poll_interval,
315 limit: self.limit,
316 poll_count: self.poll_count,
317 state: self.state,
318 span: self.span,
319 map,
320 _pd: PhantomData,
321 }
322 }
323}
324
325impl<Resp, Output, Map> Stream for PollerStream<Resp, Output, Map>
326where
327 Resp: RpcRecv + 'static,
328 Map: Fn(Resp) -> Output + Unpin,
329{
330 type Item = Output;
331
332 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
333 let this = self.get_mut();
334 let _guard = this.span.enter();
335
336 loop {
337 match &mut this.state {
338 PollState::Paused => return Poll::Pending,
339 PollState::Waiting => {
340 if this.poll_count >= this.limit {
342 debug!("poll limit reached");
343 this.state = PollState::Finished;
344 continue;
345 }
346
347 let Some(client) = this.client.upgrade() else {
349 debug!("client dropped");
350 this.state = PollState::Finished;
351 continue;
352 };
353
354 trace!("polling");
356 let method = this.method.clone();
357 let params = this.params.clone();
358 let fut = Box::pin(async move { client.request(method, params).await });
359 this.state = PollState::Polling(fut);
360 }
361 PollState::Polling(fut) => {
362 match ready!(fut.poll_unpin(cx)) {
363 Ok(resp) => {
364 this.poll_count += 1;
365 trace!(duration=?this.poll_interval, "sleeping");
367 let sleep = Box::pin(sleep(this.poll_interval));
368 this.state = PollState::Sleeping(sleep);
369 return Poll::Ready(Some((this.map)(resp)));
370 }
371 Err(err) => {
372 error!(%err, "failed to poll");
373
374 if let Some(resp) = err.as_error_resp() {
379 if resp.message.contains("filter not found") {
380 warn!("server has dropped the filter, stopping poller");
381 this.state = PollState::Finished;
382 continue;
383 }
384 }
385
386 trace!(duration=?this.poll_interval, "sleeping after error");
388
389 let sleep = Box::pin(sleep(this.poll_interval));
390 this.state = PollState::Sleeping(sleep);
391 }
392 }
393 }
394 PollState::Sleeping(sleep) => {
395 ready!(sleep.as_mut().poll(cx));
396 this.state = PollState::Waiting;
397 }
398 PollState::Finished => {
399 return Poll::Ready(None);
400 }
401 }
402 }
403 }
404}
405
406impl<Resp, Output, Map> FusedStream for PollerStream<Resp, Output, Map>
407where
408 Resp: RpcRecv + 'static,
409 Map: Fn(Resp) -> Output + Unpin,
410{
411 fn is_terminated(&self) -> bool {
412 matches!(self.state, PollState::Finished)
413 }
414}
415
416#[derive(Debug)]
428pub struct PollChannel<Resp> {
429 rx: broadcast::Receiver<Resp>,
430}
431
432impl<Resp> From<broadcast::Receiver<Resp>> for PollChannel<Resp> {
433 fn from(rx: broadcast::Receiver<Resp>) -> Self {
434 Self { rx }
435 }
436}
437
438impl<Resp> Deref for PollChannel<Resp> {
439 type Target = broadcast::Receiver<Resp>;
440
441 fn deref(&self) -> &Self::Target {
442 &self.rx
443 }
444}
445
446impl<Resp> DerefMut for PollChannel<Resp> {
447 fn deref_mut(&mut self) -> &mut Self::Target {
448 &mut self.rx
449 }
450}
451
452impl<Resp> PollChannel<Resp>
453where
454 Resp: RpcRecv + Clone,
455{
456 pub fn resubscribe(&self) -> Self {
458 Self { rx: self.rx.resubscribe() }
459 }
460
461 pub fn into_stream(self) -> impl Stream<Item = Resp> + Unpin {
463 self.into_stream_raw().filter_map(|r| futures::future::ready(r.ok()))
464 }
465
466 pub fn into_stream_raw(self) -> BroadcastStream<Resp> {
469 self.rx.into()
470 }
471}
472
473#[cfg(test)]
474#[allow(clippy::missing_const_for_fn)]
475fn _assert_unpin() {
476 fn _assert<T: Unpin>() {}
477 _assert::<PollChannel<()>>();
478}