1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use crate::bundle::BundleHash;
use ethers::core::types::{Block, TxHash, U64};
use ethers::providers::{
    interval, JsonRpcClient, Middleware, Provider, ProviderError, DEFAULT_POLL_INTERVAL,
};
use futures_core::stream::Stream;
use futures_util::stream::StreamExt;
use pin_project::pin_project;
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};
use thiserror::Error;

/// A pending bundle is one that has been submitted to a relay,
/// but not yet included.
///
/// You can `await` the pending bundle. When the target block of the
/// bundle has been included in the chain the future will resolve,
/// either with the bundle hash indicating that the bundle was
/// included in the target block, or with an error indicating
/// that the bundle was not included in the target block.
///
/// To figure out why your bundle was not included, refer to the
/// [Flashbots documentation][fb_debug].
///
/// [fb_debug]: https://docs.flashbots.net/flashbots-auction/searchers/faq/#why-didnt-my-transaction-get-included
#[pin_project]
pub struct PendingBundle<'a, P> {
    bundle_hash: BundleHash,
    block: U64,
    transactions: Vec<TxHash>,
    provider: &'a Provider<P>,
    state: PendingBundleState<'a>,
    interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
}

impl<'a, P: JsonRpcClient> PendingBundle<'a, P> {
    pub fn new(
        bundle_hash: BundleHash,
        block: U64,
        transactions: Vec<TxHash>,
        provider: &'a Provider<P>,
    ) -> Self {
        Self {
            bundle_hash,
            block,
            transactions,
            provider,
            state: PendingBundleState::PausedGettingBlock,
            interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
        }
    }

    /// Get the bundle hash for this pending bundle.
    pub fn bundle_hash(&self) -> BundleHash {
        self.bundle_hash
    }
}

impl<'a, P: JsonRpcClient> Future for PendingBundle<'a, P> {
    type Output = Result<BundleHash, PendingBundleError>;

    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
        let this = self.project();

        match this.state {
            PendingBundleState::PausedGettingBlock => {
                futures_util::ready!(this.interval.poll_next_unpin(ctx));
                let fut = Box::pin(this.provider.get_block(*this.block));
                *this.state = PendingBundleState::GettingBlock(fut);
                ctx.waker().wake_by_ref();
            }
            PendingBundleState::GettingBlock(fut) => {
                let block_res = futures_util::ready!(fut.as_mut().poll(ctx));

                // If the provider errors, we try again after some interval.
                if block_res.is_err() {
                    *this.state = PendingBundleState::PausedGettingBlock;
                    ctx.waker().wake_by_ref();
                    return Poll::Pending;
                }

                let block_opt = block_res.unwrap();
                // If the block doesn't exist yet, we try again after some interval.
                if block_opt.is_none() {
                    *this.state = PendingBundleState::PausedGettingBlock;
                    ctx.waker().wake_by_ref();
                    return Poll::Pending;
                }

                let block = block_opt.unwrap();
                // If the block is pending, we try again after some interval.
                if block.number.is_none() {
                    *this.state = PendingBundleState::PausedGettingBlock;
                    ctx.waker().wake_by_ref();
                    return Poll::Pending;
                }

                // Check if the bundle transactions are present in the block
                // Since a bundle cannot be divided, we only need to check
                // if the first transaction was included.
                //
                // Note: The indexed access is safe, since empty bundles
                // (i.e. bundles with no transactions) cannot be submitted.
                let included = block
                    .transactions
                    .iter()
                    .any(|tx_hash| *tx_hash == this.transactions[0]);

                *this.state = PendingBundleState::Completed;
                if included {
                    return Poll::Ready(Ok(*this.bundle_hash));
                } else {
                    return Poll::Ready(Err(PendingBundleError::BundleNotIncluded));
                }
            }
            PendingBundleState::Completed => {
                panic!("polled pending bundle future after completion")
            }
        }

        Poll::Pending
    }
}

/// Errors for pending bundles.
#[derive(Error, Debug)]
pub enum PendingBundleError {
    /// The bundle was not included in the target block.
    #[error("Bundle was not included in target block")]
    BundleNotIncluded,
    /// An error occured while interacting with the RPC endpoint.
    #[error(transparent)]
    ProviderError(#[from] ProviderError),
}

type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;

enum PendingBundleState<'a> {
    /// Waiting for an interval before calling API again
    PausedGettingBlock,

    /// Polling the blockchain to get block information
    GettingBlock(PinBoxFut<'a, Option<Block<TxHash>>>),

    /// Future has completed
    Completed,
}