alloy_flashblocks/
poller.rs1use alloy::{
4 eips::BlockNumberOrTag,
5 network::{BlockResponse, Network, primitives::HeaderResponse},
6 providers::Provider,
7};
8use futures_util::Stream;
9use std::{pin::Pin, task::Poll, time::Duration};
10use tokio::time::{Interval, interval};
11
12pub const FLASHBLOCK_INTERVAL: Duration = Duration::from_millis(200);
14
15pub const POLL_INTERVAL: Duration = Duration::from_millis(
17 (FLASHBLOCK_INTERVAL.as_millis() as f64 * 0.6) as u64
18);
19
20pub struct FlashblockPoller<N: Network, P: Provider<N>> {
25 provider: P,
26 interval: Interval,
27 last_hash: Option<alloy::primitives::B256>,
28 pending_request:
29 Option<Pin<Box<dyn std::future::Future<Output = Option<N::BlockResponse>> + Send>>>,
30}
31
32impl<N: Network, P: Provider<N>> Unpin for FlashblockPoller<N, P> {}
34
35impl<N: Network, P: Provider<N>> std::fmt::Debug for FlashblockPoller<N, P> {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("FlashblockPoller")
38 .field("last_hash", &self.last_hash)
39 .field("has_pending_request", &self.pending_request.is_some())
40 .finish_non_exhaustive()
41 }
42}
43
44impl<N: Network, P: Provider<N> + Clone + Send + Sync + 'static> FlashblockPoller<N, P> {
45 pub fn new(provider: P) -> Self {
47 Self {
48 provider,
49 interval: interval(POLL_INTERVAL),
50 last_hash: None,
51 pending_request: None,
52 }
53 }
54
55 pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = N::BlockResponse> + Send>>
57 where
58 Self: Send + 'static,
59 {
60 Box::pin(self)
61 }
62}
63
64impl<N: Network, P: Provider<N> + Clone + Send + Sync + 'static> Stream for FlashblockPoller<N, P> {
65 type Item = N::BlockResponse;
66
67 fn poll_next(
68 self: Pin<&mut Self>,
69 cx: &mut std::task::Context<'_>,
70 ) -> Poll<Option<Self::Item>> {
71 let this = self.get_mut();
72
73 loop {
74 if let Some(ref mut fut) = this.pending_request {
76 match fut.as_mut().poll(cx) {
77 Poll::Ready(result) => {
78 this.pending_request = None;
79
80 if let Some(block) = result {
81 let block_hash = block.header().hash();
82
83 if this.last_hash != Some(block_hash) {
85 this.last_hash = Some(block_hash);
86 return Poll::Ready(Some(block));
87 }
88 }
89 }
91 Poll::Pending => return Poll::Pending,
92 }
93 }
94
95 match this.interval.poll_tick(cx) {
97 Poll::Ready(_) => {
98 let provider = this.provider.clone();
100 this.pending_request = Some(Box::pin(async move {
101 provider
102 .get_block_by_number(BlockNumberOrTag::Pending)
103 .await
104 .ok()
105 .flatten()
106 }));
107 }
108 Poll::Pending => return Poll::Pending,
109 }
110 }
111 }
112}