alloy_flashblocks/
poller.rs

1//! FlashblockPoller for streaming pending block updates via RPC polling.
2
3use alloy::{
4    eips::BlockNumberOrTag,
5    network::{BlockResponse, Network, primitives::HeaderResponse},
6    providers::Provider,
7};
8use futures_util::Stream;
9use std::{pin::Pin, task::Poll, time::Duration};
10use tokio::time::{Interval, interval};
11
12/// Flashblock emission interval (~200ms on Base).
13pub const FLASHBLOCK_INTERVAL: Duration = Duration::from_millis(200);
14
15/// Poll interval (0.6 × flashblock interval, following Alloy convention).
16pub const POLL_INTERVAL: Duration = Duration::from_millis(
17    (FLASHBLOCK_INTERVAL.as_millis() as f64 * 0.6) as u64
18);
19
20/// Polls for pending block updates via JSON-RPC.
21///
22/// Calls `eth_getBlockByNumber("pending")` every 120ms and yields
23/// new blocks when the block hash changes.
24pub struct FlashblockPoller<N: Network, P: Provider<N>> {
25    provider: P,
26    interval: Interval,
27    last_hash: Option<alloy::primitives::B256>,
28    pending_request:
29        Option<Pin<Box<dyn std::future::Future<Output = Option<N::BlockResponse>> + Send>>>,
30}
31
32// Safety: All fields are Unpin (Box<dyn Future> is Unpin, Interval is Unpin, etc.)
33impl<N: Network, P: Provider<N>> Unpin for FlashblockPoller<N, P> {}
34
35impl<N: Network, P: Provider<N>> std::fmt::Debug for FlashblockPoller<N, P> {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("FlashblockPoller")
38            .field("last_hash", &self.last_hash)
39            .field("has_pending_request", &self.pending_request.is_some())
40            .finish_non_exhaustive()
41    }
42}
43
44impl<N: Network, P: Provider<N> + Clone + Send + Sync + 'static> FlashblockPoller<N, P> {
45    /// Create a new poller.
46    pub fn new(provider: P) -> Self {
47        Self {
48            provider,
49            interval: interval(POLL_INTERVAL),
50            last_hash: None,
51            pending_request: None,
52        }
53    }
54
55    /// Convert to a boxed stream for easier usage.
56    pub fn into_stream(self) -> Pin<Box<dyn Stream<Item = N::BlockResponse> + Send>>
57    where
58        Self: Send + 'static,
59    {
60        Box::pin(self)
61    }
62}
63
64impl<N: Network, P: Provider<N> + Clone + Send + Sync + 'static> Stream for FlashblockPoller<N, P> {
65    type Item = N::BlockResponse;
66
67    fn poll_next(
68        self: Pin<&mut Self>,
69        cx: &mut std::task::Context<'_>,
70    ) -> Poll<Option<Self::Item>> {
71        let this = self.get_mut();
72
73        loop {
74            // If we have a pending request, poll it
75            if let Some(ref mut fut) = this.pending_request {
76                match fut.as_mut().poll(cx) {
77                    Poll::Ready(result) => {
78                        this.pending_request = None;
79
80                        if let Some(block) = result {
81                            let block_hash = block.header().hash();
82
83                            // Only yield if block changed
84                            if this.last_hash != Some(block_hash) {
85                                this.last_hash = Some(block_hash);
86                                return Poll::Ready(Some(block));
87                            }
88                        }
89                        // Block didn't change or was None, continue to next tick
90                    }
91                    Poll::Pending => return Poll::Pending,
92                }
93            }
94
95            // Wait for next interval tick
96            match this.interval.poll_tick(cx) {
97                Poll::Ready(_) => {
98                    // Start a new request
99                    let provider = this.provider.clone();
100                    this.pending_request = Some(Box::pin(async move {
101                        provider
102                            .get_block_by_number(BlockNumberOrTag::Pending)
103                            .await
104                            .ok()
105                            .flatten()
106                    }));
107                }
108                Poll::Pending => return Poll::Pending,
109            }
110        }
111    }
112}