ethers_flashbots/
pending_bundle.rs

1use crate::bundle::BundleHash;
2use ethers::core::types::{Block, TxHash, U64};
3use ethers::providers::{
4    interval, JsonRpcClient, Middleware, Provider, ProviderError, DEFAULT_POLL_INTERVAL,
5};
6use futures_core::stream::Stream;
7use futures_util::stream::StreamExt;
8use pin_project::pin_project;
9use std::{
10    future::Future,
11    pin::Pin,
12    task::{Context, Poll},
13};
14use thiserror::Error;
15
16/// A pending bundle is one that has been submitted to a relay,
17/// but not yet included.
18///
19/// You can `await` the pending bundle. When the target block of the
20/// bundle has been included in the chain the future will resolve,
21/// either with the bundle hash indicating that the bundle was
22/// included in the target block, or with an error indicating
23/// that the bundle was not included in the target block.
24///
25/// To figure out why your bundle was not included, refer to the
26/// [Flashbots documentation][fb_debug].
27///
28/// [fb_debug]: https://docs.flashbots.net/flashbots-auction/searchers/faq/#why-didnt-my-transaction-get-included
29#[pin_project]
30pub struct PendingBundle<'a, P> {
31    pub bundle_hash: Option<BundleHash>,
32    pub block: U64,
33    pub transactions: Vec<TxHash>,
34    provider: &'a Provider<P>,
35    state: PendingBundleState<'a>,
36    interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
37}
38
39impl<'a, P: JsonRpcClient> PendingBundle<'a, P> {
40    pub fn new(
41        bundle_hash: Option<BundleHash>,
42        block: U64,
43        transactions: Vec<TxHash>,
44        provider: &'a Provider<P>,
45    ) -> Self {
46        Self {
47            bundle_hash,
48            block,
49            transactions,
50            provider,
51            state: PendingBundleState::PausedGettingBlock,
52            interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
53        }
54    }
55
56    /// Get the bundle hash for this pending bundle.
57    #[deprecated(note = "use the bundle_hash field instead")]
58    pub fn bundle_hash(&self) -> Option<BundleHash> {
59        self.bundle_hash
60    }
61}
62
63impl<'a, P: JsonRpcClient> Future for PendingBundle<'a, P> {
64    type Output = Result<Option<BundleHash>, PendingBundleError>;
65
66    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
67        let this = self.project();
68
69        match this.state {
70            PendingBundleState::PausedGettingBlock => {
71                futures_util::ready!(this.interval.poll_next_unpin(ctx));
72                let fut = Box::pin(this.provider.get_block(*this.block));
73                *this.state = PendingBundleState::GettingBlock(fut);
74                ctx.waker().wake_by_ref();
75            }
76            PendingBundleState::GettingBlock(fut) => {
77                let block_res = futures_util::ready!(fut.as_mut().poll(ctx));
78
79                // If the provider errors, we try again after some interval.
80                if block_res.is_err() {
81                    *this.state = PendingBundleState::PausedGettingBlock;
82                    ctx.waker().wake_by_ref();
83                    return Poll::Pending;
84                }
85
86                let block_opt = block_res.unwrap();
87                // If the block doesn't exist yet, we try again after some interval.
88                if block_opt.is_none() {
89                    *this.state = PendingBundleState::PausedGettingBlock;
90                    ctx.waker().wake_by_ref();
91                    return Poll::Pending;
92                }
93
94                let block = block_opt.unwrap();
95                // If the block is pending, we try again after some interval.
96                if block.number.is_none() {
97                    *this.state = PendingBundleState::PausedGettingBlock;
98                    ctx.waker().wake_by_ref();
99                    return Poll::Pending;
100                }
101
102                // Check if all transactions of the bundle are present in the block
103                let included: bool = this
104                    .transactions
105                    .iter()
106                    .all(|tx_hash| block.transactions.contains(tx_hash));
107
108                *this.state = PendingBundleState::Completed;
109                if included {
110                    return Poll::Ready(Ok(*this.bundle_hash));
111                } else {
112                    return Poll::Ready(Err(PendingBundleError::BundleNotIncluded));
113                }
114            }
115            PendingBundleState::Completed => {
116                panic!("polled pending bundle future after completion")
117            }
118        }
119
120        Poll::Pending
121    }
122}
123
124/// Errors for pending bundles.
125#[derive(Error, Debug)]
126pub enum PendingBundleError {
127    /// The bundle was not included in the target block.
128    #[error("Bundle was not included in target block")]
129    BundleNotIncluded,
130    /// An error occured while interacting with the RPC endpoint.
131    #[error(transparent)]
132    ProviderError(#[from] ProviderError),
133}
134
135type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;
136
137enum PendingBundleState<'a> {
138    /// Waiting for an interval before calling API again
139    PausedGettingBlock,
140
141    /// Polling the blockchain to get block information
142    GettingBlock(PinBoxFut<'a, Option<Block<TxHash>>>),
143
144    /// Future has completed
145    Completed,
146}