ethers_providers/toolbox/
pending_escalator.rs

1#![allow(clippy::return_self_not_must_use)]
2
3use ethers_core::types::{Bytes, TransactionReceipt, H256};
4use futures_timer::Delay;
5use futures_util::{stream::FuturesUnordered, StreamExt};
6use instant::{Duration, Instant};
7use pin_project::pin_project;
8use std::{future::Future, pin::Pin, task::Poll};
9
10use crate::{
11    utils::PinBoxFut, JsonRpcClient, Middleware, PendingTransaction, Provider, ProviderError,
12};
13
14/// States for the EscalatingPending future
15enum EscalatorStates<'a, P> {
16    Initial(PinBoxFut<'a, PendingTransaction<'a, P>>),
17    Sleeping(Pin<Box<Delay>>),
18    BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>),
19    CheckingReceipts(FuturesUnordered<PinBoxFut<'a, Option<TransactionReceipt>>>),
20    Completed,
21}
22
23/// An EscalatingPending is a pending transaction that increases its own gas
24/// price over time, by broadcasting successive versions with higher gas prices.
25#[must_use]
26#[pin_project(project = PendingProj)]
27#[derive(Debug)]
28pub struct EscalatingPending<'a, P>
29where
30    P: JsonRpcClient,
31{
32    provider: &'a Provider<P>,
33    broadcast_interval: Duration,
34    polling_interval: Duration,
35    txns: Vec<Bytes>,
36    last: Instant,
37    sent: Vec<H256>,
38    state: EscalatorStates<'a, P>,
39}
40
41impl<'a, P> EscalatingPending<'a, P>
42where
43    P: JsonRpcClient,
44{
45    /// Instantiate a new EscalatingPending. This should only be called by the
46    /// Middleware trait.
47    ///
48    /// Callers MUST ensure that transactions are in _reverse_ broadcast order
49    /// (this just makes writing the code easier, as we can use `pop()` a lot).
50    ///
51    /// TODO: consider deserializing and checking invariants (gas order, etc.)
52    pub(crate) fn new(provider: &'a Provider<P>, mut txns: Vec<Bytes>) -> Self {
53        if txns.is_empty() {
54            panic!("bad args");
55        }
56
57        let first = txns.pop().expect("bad args");
58        // Sane-feeling default intervals
59        Self {
60            provider,
61            broadcast_interval: Duration::from_millis(150),
62            polling_interval: Duration::from_millis(10),
63            txns,
64            // placeholder value. We set this again after the initial broadcast
65            // future resolves
66            last: Instant::now(),
67            sent: vec![],
68            state: EscalatorStates::Initial(Box::pin(provider.send_raw_transaction(first))),
69        }
70    }
71
72    /// Set the broadcast interval. This controls how often the escalator
73    /// broadcasts a new transaction at a higher gas price
74    pub fn with_broadcast_interval(mut self, duration: impl Into<Duration>) -> Self {
75        self.broadcast_interval = duration.into();
76        self
77    }
78
79    /// Set the polling interval. This controls how often the escalator checks
80    /// transaction receipts for confirmation.
81    pub fn with_polling_interval(mut self, duration: impl Into<Duration>) -> Self {
82        self.polling_interval = duration.into();
83        self
84    }
85
86    /// Get the current polling interval.
87    pub fn get_polling_interval(&self) -> Duration {
88        self.polling_interval
89    }
90
91    /// Get the current broadcast interval.
92    pub fn get_broadcast_interval(&self) -> Duration {
93        self.broadcast_interval
94    }
95}
96
97macro_rules! check_all_receipts {
98    ($cx:ident, $this:ident) => {
99        let futs: futures_util::stream::FuturesUnordered<_> = $this
100            .sent
101            .iter()
102            .map(|tx_hash| $this.provider.get_transaction_receipt(*tx_hash))
103            .collect();
104        *$this.state = CheckingReceipts(futs);
105        $cx.waker().wake_by_ref();
106        return Poll::Pending
107    };
108}
109
110macro_rules! sleep {
111    ($cx:ident, $this:ident) => {
112        *$this.state = EscalatorStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval)));
113        $cx.waker().wake_by_ref();
114        return Poll::Pending
115    };
116}
117
118macro_rules! completed {
119    ($this:ident, $output:expr) => {
120        *$this.state = Completed;
121        return Poll::Ready($output)
122    };
123}
124
125/// Tests Provider error for nonce too low issue through debug contents
126fn is_nonce_too_low(e: &ProviderError) -> bool {
127    let debug_str = format!("{e:?}");
128
129    debug_str.contains("nonce too low") // Geth, Arbitrum, Optimism
130            || debug_str.contains("nonce is too low") // Parity
131            || debug_str.contains("invalid transaction nonce") // Arbitrum
132}
133
134macro_rules! poll_broadcast_fut {
135    ($cx:ident, $this:ident, $fut:ident) => {
136        match $fut.as_mut().poll($cx) {
137            Poll::Ready(Ok(pending)) => {
138                *$this.last = Instant::now();
139                $this.sent.push(*pending);
140                tracing::info!(
141                    tx_hash = ?*pending,
142                    escalation = $this.sent.len(),
143                    "Escalation transaction broadcast complete"
144                );
145                check_all_receipts!($cx, $this);
146            }
147            Poll::Ready(Err(e)) => {
148                // kludge. Prevents erroring on "nonce too low" which indicates
149                // a previous escalation confirmed during this broadcast attempt
150                if is_nonce_too_low(&e) {
151                    check_all_receipts!($cx, $this);
152                } else {
153                    tracing::error!(
154                        error = ?e,
155                        "Error during transaction broadcast"
156                    );
157
158                    completed!($this, Err(e));
159                }
160            }
161            Poll::Pending => return Poll::Pending,
162        }
163    };
164}
165
166impl<'a, P> Future for EscalatingPending<'a, P>
167where
168    P: JsonRpcClient,
169{
170    type Output = Result<TransactionReceipt, ProviderError>;
171
172    #[cfg_attr(target_arch = "wasm32", allow(unused_must_use))]
173    fn poll(
174        self: std::pin::Pin<&mut Self>,
175        cx: &mut std::task::Context<'_>,
176    ) -> std::task::Poll<Self::Output> {
177        use EscalatorStates::*;
178
179        let this = self.project();
180
181        match this.state {
182            // In the initial state we're simply waiting on the first
183            // transaction broadcast to complete.
184            Initial(fut) => {
185                poll_broadcast_fut!(cx, this, fut);
186            }
187            Sleeping(delay) => {
188                futures_util::ready!(delay.as_mut().poll(cx));
189                // if broadcast timer has elapsed and if we have a TX to
190                // broadcast, broadcast it
191                if this.last.elapsed() > *this.broadcast_interval {
192                    if let Some(next_to_broadcast) = this.txns.pop() {
193                        let fut = this.provider.send_raw_transaction(next_to_broadcast);
194                        *this.state = BroadcastingNew(fut);
195                        cx.waker().wake_by_ref();
196                        return Poll::Pending
197                    }
198                }
199                check_all_receipts!(cx, this);
200            }
201            // This state is functionally equivalent to Initial, but we
202            // differentiate it for clarity
203            BroadcastingNew(fut) => {
204                poll_broadcast_fut!(cx, this, fut);
205            }
206            CheckingReceipts(futs) => {
207                // Poll the set of `get_transaction_receipt` futures to check
208                // if any previously-broadcast transaction was confirmed.
209                // Continue doing this until all are resolved
210                match futs.poll_next_unpin(cx) {
211                    // We have found a receipt. This means that all other
212                    // broadcast txns are now invalid, so we can drop the
213                    // futures and complete
214                    Poll::Ready(Some(Ok(Some(receipt)))) => {
215                        completed!(this, Ok(receipt));
216                    }
217                    // A `get_transaction_receipt` request resolved, but but we
218                    // found no receipt, rewake and check if any other requests
219                    // are resolved
220                    Poll::Ready(Some(Ok(None))) => cx.waker().wake_by_ref(),
221                    // A request errored. We complete the future with the error.
222                    Poll::Ready(Some(Err(e))) => {
223                        completed!(this, Err(e));
224                    }
225                    // We have run out of `get_transaction_receipt` requests.
226                    // Sleep and then check if we should broadcast again (or
227                    // check receipts again)
228                    Poll::Ready(None) => {
229                        sleep!(cx, this);
230                    }
231                    // No request has resolved yet. Try again later
232                    Poll::Pending => return Poll::Pending,
233                }
234            }
235            Completed => panic!("polled after completion"),
236        }
237
238        Poll::Pending
239    }
240}
241
242impl<'a, P> std::fmt::Debug for EscalatorStates<'a, P> {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        let state = match self {
245            Self::Initial(_) => "Initial",
246            Self::Sleeping(_) => "Sleeping",
247            Self::BroadcastingNew(_) => "BroadcastingNew",
248            Self::CheckingReceipts(_) => "CheckingReceipts",
249            Self::Completed => "Completed",
250        };
251        f.debug_struct("EscalatorStates").field("state", &state).finish()
252    }
253}