ethers_flashbots/
pending_bundle.rs1use crate::bundle::BundleHash;
2use ethers::core::types::{Block, TxHash, U64};
3use ethers::providers::{
4 interval, JsonRpcClient, Middleware, Provider, ProviderError, DEFAULT_POLL_INTERVAL,
5};
6use futures_core::stream::Stream;
7use futures_util::stream::StreamExt;
8use pin_project::pin_project;
9use std::{
10 future::Future,
11 pin::Pin,
12 task::{Context, Poll},
13};
14use thiserror::Error;
15
16#[pin_project]
30pub struct PendingBundle<'a, P> {
31 pub bundle_hash: Option<BundleHash>,
32 pub block: U64,
33 pub transactions: Vec<TxHash>,
34 provider: &'a Provider<P>,
35 state: PendingBundleState<'a>,
36 interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
37}
38
39impl<'a, P: JsonRpcClient> PendingBundle<'a, P> {
40 pub fn new(
41 bundle_hash: Option<BundleHash>,
42 block: U64,
43 transactions: Vec<TxHash>,
44 provider: &'a Provider<P>,
45 ) -> Self {
46 Self {
47 bundle_hash,
48 block,
49 transactions,
50 provider,
51 state: PendingBundleState::PausedGettingBlock,
52 interval: Box::new(interval(DEFAULT_POLL_INTERVAL)),
53 }
54 }
55
56 #[deprecated(note = "use the bundle_hash field instead")]
58 pub fn bundle_hash(&self) -> Option<BundleHash> {
59 self.bundle_hash
60 }
61}
62
63impl<'a, P: JsonRpcClient> Future for PendingBundle<'a, P> {
64 type Output = Result<Option<BundleHash>, PendingBundleError>;
65
66 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
67 let this = self.project();
68
69 match this.state {
70 PendingBundleState::PausedGettingBlock => {
71 futures_util::ready!(this.interval.poll_next_unpin(ctx));
72 let fut = Box::pin(this.provider.get_block(*this.block));
73 *this.state = PendingBundleState::GettingBlock(fut);
74 ctx.waker().wake_by_ref();
75 }
76 PendingBundleState::GettingBlock(fut) => {
77 let block_res = futures_util::ready!(fut.as_mut().poll(ctx));
78
79 if block_res.is_err() {
81 *this.state = PendingBundleState::PausedGettingBlock;
82 ctx.waker().wake_by_ref();
83 return Poll::Pending;
84 }
85
86 let block_opt = block_res.unwrap();
87 if block_opt.is_none() {
89 *this.state = PendingBundleState::PausedGettingBlock;
90 ctx.waker().wake_by_ref();
91 return Poll::Pending;
92 }
93
94 let block = block_opt.unwrap();
95 if block.number.is_none() {
97 *this.state = PendingBundleState::PausedGettingBlock;
98 ctx.waker().wake_by_ref();
99 return Poll::Pending;
100 }
101
102 let included: bool = this
104 .transactions
105 .iter()
106 .all(|tx_hash| block.transactions.contains(tx_hash));
107
108 *this.state = PendingBundleState::Completed;
109 if included {
110 return Poll::Ready(Ok(*this.bundle_hash));
111 } else {
112 return Poll::Ready(Err(PendingBundleError::BundleNotIncluded));
113 }
114 }
115 PendingBundleState::Completed => {
116 panic!("polled pending bundle future after completion")
117 }
118 }
119
120 Poll::Pending
121 }
122}
123
124#[derive(Error, Debug)]
126pub enum PendingBundleError {
127 #[error("Bundle was not included in target block")]
129 BundleNotIncluded,
130 #[error(transparent)]
132 ProviderError(#[from] ProviderError),
133}
134
135type PinBoxFut<'a, T> = Pin<Box<dyn Future<Output = Result<T, ProviderError>> + Send + 'a>>;
136
137enum PendingBundleState<'a> {
138 PausedGettingBlock,
140
141 GettingBlock(PinBoxFut<'a, Option<Block<TxHash>>>),
143
144 Completed,
146}