ethers_providers/toolbox/
pending_transaction.rs

1use crate::{
2    utils::{interval, PinBoxFut},
3    JsonRpcClient, Middleware, Provider, ProviderError,
4};
5use ethers_core::types::{Transaction, TransactionReceipt, TxHash, U64};
6use futures_core::stream::Stream;
7use futures_timer::Delay;
8use futures_util::stream::StreamExt;
9use instant::Duration;
10use pin_project::pin_project;
11use std::{
12    fmt,
13    future::Future,
14    ops::Deref,
15    pin::Pin,
16    task::{Context, Poll},
17};
18
19/// A pending transaction is a transaction which has been submitted but is not yet mined.
20/// `await`'ing on a pending transaction will resolve to a transaction receipt
21/// once the transaction has enough `confirmations`. The default number of confirmations
22/// is 1, but may be adjusted with the `confirmations` method. If the transaction does not
23/// have enough confirmations or is not mined, the future will stay in the pending state.
24///
25/// # Example
26///
27/// ```ignore
28/// use ethers_core::types::TransactionRequest;
29///
30/// let tx = TransactionRequest::new().to(to).value(1000).from(from);
31/// let receipt = client
32///     .send_transaction(tx, None)
33///     .await?                           // PendingTransaction<_>
34///     .log_msg("Pending transfer hash") // print pending tx hash with message
35///     .await?;                          // Result<Option<TransactionReceipt>, _>
36/// ```
37#[pin_project]
38pub struct PendingTransaction<'a, P> {
39    tx_hash: TxHash,
40    confirmations: usize,
41    provider: &'a Provider<P>,
42    state: PendingTxState<'a>,
43    interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
44    retries_remaining: usize,
45}
46
47const DEFAULT_RETRIES: usize = 3;
48
49impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
50    /// Creates a new pending transaction poller from a hash and a provider
51    pub fn new(tx_hash: TxHash, provider: &'a Provider<P>) -> Self {
52        let delay = Box::pin(Delay::new(provider.get_interval()));
53
54        Self {
55            tx_hash,
56            confirmations: 1,
57            provider,
58            state: PendingTxState::InitialDelay(delay),
59            interval: Box::new(interval(provider.get_interval())),
60            retries_remaining: DEFAULT_RETRIES,
61        }
62    }
63
64    /// Returns the Provider associated with the pending transaction
65    pub fn provider(&self) -> Provider<P>
66    where
67        P: Clone,
68    {
69        self.provider.clone()
70    }
71
72    /// Returns the transaction hash of the pending transaction
73    pub fn tx_hash(&self) -> TxHash {
74        self.tx_hash
75    }
76
77    /// Sets the number of confirmations for the pending transaction to resolve
78    /// to a receipt
79    #[must_use]
80    pub fn confirmations(mut self, confs: usize) -> Self {
81        self.confirmations = confs;
82        self
83    }
84
85    /// Sets the polling interval
86    #[must_use]
87    pub fn interval<T: Into<Duration>>(mut self, duration: T) -> Self {
88        let duration = duration.into();
89
90        self.interval = Box::new(interval(duration));
91
92        if matches!(self.state, PendingTxState::InitialDelay(_)) {
93            self.state = PendingTxState::InitialDelay(Box::pin(Delay::new(duration)))
94        }
95
96        self
97    }
98
99    /// Set retries
100    #[must_use]
101    pub fn retries(mut self, retries: usize) -> Self {
102        self.retries_remaining = retries;
103        self
104    }
105}
106
107impl<'a, P> PendingTransaction<'a, P> {
108    /// Allows inspecting the content of a pending transaction in a builder-like way to avoid
109    /// more verbose calls, e.g.:
110    /// `let mined = token.transfer(recipient, amt).send().await?.inspect(|tx| println!(".{}",
111    /// *tx)).await?;`
112    pub fn inspect<F>(self, mut f: F) -> Self
113    where
114        F: FnMut(&Self),
115    {
116        f(&self);
117        self
118    }
119
120    /// Logs the pending transaction hash along with a custom message before it.
121    pub fn log_msg<S: std::fmt::Display>(self, msg: S) -> Self {
122        self.inspect(|s| println!("{msg}: {:?}", **s))
123    }
124
125    /// Logs the pending transaction's hash
126    pub fn log(self) -> Self {
127        self.inspect(|s| println!("Pending hash: {:?}", **s))
128    }
129}
130
131macro_rules! rewake_with_new_state {
132    ($ctx:ident, $this:ident, $new_state:expr) => {
133        *$this.state = $new_state;
134        $ctx.waker().wake_by_ref();
135        return Poll::Pending
136    };
137}
138
139macro_rules! rewake_with_new_state_if {
140    ($condition:expr, $ctx:ident, $this:ident, $new_state:expr) => {
141        if $condition {
142            rewake_with_new_state!($ctx, $this, $new_state);
143        }
144    };
145}
146
147impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
148    type Output = Result<Option<TransactionReceipt>, ProviderError>;
149
150    #[cfg_attr(target_arch = "wasm32", allow(unused_must_use))]
151    fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
152        let this = self.project();
153
154        match this.state {
155            PendingTxState::InitialDelay(fut) => {
156                futures_util::ready!(fut.as_mut().poll(ctx));
157                tracing::debug!("Starting to poll pending tx {:?}", *this.tx_hash);
158                let fut = Box::pin(this.provider.get_transaction(*this.tx_hash));
159                rewake_with_new_state!(ctx, this, PendingTxState::GettingTx(fut));
160            }
161            PendingTxState::PausedGettingTx => {
162                // Wait the polling period so that we do not spam the chain when no
163                // new block has been mined
164                let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
165                let fut = Box::pin(this.provider.get_transaction(*this.tx_hash));
166                *this.state = PendingTxState::GettingTx(fut);
167                ctx.waker().wake_by_ref();
168            }
169            PendingTxState::GettingTx(fut) => {
170                let tx_res = futures_util::ready!(fut.as_mut().poll(ctx));
171                // If the provider errors, just try again after the interval.
172                // nbd.
173                rewake_with_new_state_if!(
174                    tx_res.is_err(),
175                    ctx,
176                    this,
177                    PendingTxState::PausedGettingTx
178                );
179
180                let tx_opt = tx_res.unwrap();
181                // If the tx is no longer in the mempool, return Ok(None)
182                if tx_opt.is_none() {
183                    if *this.retries_remaining == 0 {
184                        tracing::debug!("Dropped from mempool, pending tx {:?}", *this.tx_hash);
185                        *this.state = PendingTxState::Completed;
186                        return Poll::Ready(Ok(None))
187                    }
188
189                    *this.retries_remaining -= 1;
190                    rewake_with_new_state!(ctx, this, PendingTxState::PausedGettingTx);
191                }
192
193                // If it hasn't confirmed yet, poll again later
194                let tx = tx_opt.unwrap();
195                rewake_with_new_state_if!(
196                    tx.block_number.is_none(),
197                    ctx,
198                    this,
199                    PendingTxState::PausedGettingTx
200                );
201
202                // Start polling for the receipt now
203                tracing::debug!("Getting receipt for pending tx {:?}", *this.tx_hash);
204                let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash));
205                rewake_with_new_state!(ctx, this, PendingTxState::GettingReceipt(fut));
206            }
207            PendingTxState::PausedGettingReceipt => {
208                // Wait the polling period so that we do not spam the chain when no
209                // new block has been mined
210                let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
211                let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash));
212                *this.state = PendingTxState::GettingReceipt(fut);
213                ctx.waker().wake_by_ref();
214            }
215            PendingTxState::GettingReceipt(fut) => {
216                if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) {
217                    tracing::debug!("Checking receipt for pending tx {:?}", *this.tx_hash);
218                    *this.state = PendingTxState::CheckingReceipt(receipt)
219                } else {
220                    *this.state = PendingTxState::PausedGettingReceipt
221                }
222                ctx.waker().wake_by_ref();
223            }
224            PendingTxState::CheckingReceipt(receipt) => {
225                rewake_with_new_state_if!(
226                    receipt.is_none(),
227                    ctx,
228                    this,
229                    PendingTxState::PausedGettingReceipt
230                );
231
232                // If we requested more than 1 confirmation, we need to compare the receipt's
233                // block number and the current block
234                if *this.confirmations > 1 {
235                    tracing::debug!("Waiting on confirmations for pending tx {:?}", *this.tx_hash);
236
237                    let fut = Box::pin(this.provider.get_block_number());
238                    *this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());
239
240                    // Schedule the waker to poll again
241                    ctx.waker().wake_by_ref();
242                } else {
243                    let receipt = receipt.take();
244                    *this.state = PendingTxState::Completed;
245                    return Poll::Ready(Ok(receipt))
246                }
247            }
248            PendingTxState::PausedGettingBlockNumber(receipt) => {
249                // Wait the polling period so that we do not spam the chain when no
250                // new block has been mined
251                let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
252
253                // we need to re-instantiate the get_block_number future so that
254                // we poll again
255                let fut = Box::pin(this.provider.get_block_number());
256                *this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());
257                ctx.waker().wake_by_ref();
258            }
259            PendingTxState::GettingBlockNumber(fut, receipt) => {
260                let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?;
261
262                // This is safe so long as we only enter the `GettingBlock`
263                // loop from `CheckingReceipt`, which contains an explicit
264                // `is_none` check
265                let receipt = receipt.take().expect("GettingBlockNumber without receipt");
266
267                // Wait for the interval
268                let inclusion_block = receipt
269                    .block_number
270                    .expect("Receipt did not have a block number. This should never happen");
271                // if the transaction has at least K confirmations, return the receipt
272                // (subtract 1 since the tx already has 1 conf when it's mined)
273                if current_block > inclusion_block + *this.confirmations - 1 {
274                    let receipt = Some(receipt);
275                    *this.state = PendingTxState::Completed;
276                    return Poll::Ready(Ok(receipt))
277                } else {
278                    tracing::trace!(tx_hash = ?this.tx_hash, "confirmations {}/{}", current_block - inclusion_block + 1, this.confirmations);
279                    *this.state = PendingTxState::PausedGettingBlockNumber(Some(receipt));
280                    ctx.waker().wake_by_ref();
281                }
282            }
283            PendingTxState::Completed => {
284                panic!("polled pending transaction future after completion")
285            }
286        };
287
288        Poll::Pending
289    }
290}
291
292impl<'a, P> fmt::Debug for PendingTransaction<'a, P> {
293    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294        f.debug_struct("PendingTransaction")
295            .field("tx_hash", &self.tx_hash)
296            .field("confirmations", &self.confirmations)
297            .field("state", &self.state)
298            .finish()
299    }
300}
301
302impl<'a, P> PartialEq for PendingTransaction<'a, P> {
303    fn eq(&self, other: &Self) -> bool {
304        self.tx_hash == other.tx_hash
305    }
306}
307
308impl<'a, P> PartialEq<TxHash> for PendingTransaction<'a, P> {
309    fn eq(&self, other: &TxHash) -> bool {
310        &self.tx_hash == other
311    }
312}
313
314impl<'a, P> Eq for PendingTransaction<'a, P> {}
315
316impl<'a, P> Deref for PendingTransaction<'a, P> {
317    type Target = TxHash;
318
319    fn deref(&self) -> &Self::Target {
320        &self.tx_hash
321    }
322}
323
324// We box the TransactionReceipts to keep the enum small.
325enum PendingTxState<'a> {
326    /// Initial delay to ensure the GettingTx loop doesn't immediately fail
327    InitialDelay(Pin<Box<Delay>>),
328
329    /// Waiting for interval to elapse before calling API again
330    PausedGettingTx,
331
332    /// Polling The blockchain to see if the Tx has confirmed or dropped
333    GettingTx(PinBoxFut<'a, Option<Transaction>>),
334
335    /// Waiting for interval to elapse before calling API again
336    PausedGettingReceipt,
337
338    /// Polling the blockchain for the receipt
339    GettingReceipt(PinBoxFut<'a, Option<TransactionReceipt>>),
340
341    /// If the pending tx required only 1 conf, it will return early. Otherwise it will
342    /// proceed to the next state which will poll the block number until there have been
343    /// enough confirmations
344    CheckingReceipt(Option<TransactionReceipt>),
345
346    /// Waiting for interval to elapse before calling API again
347    PausedGettingBlockNumber(Option<TransactionReceipt>),
348
349    /// Polling the blockchain for the current block number
350    GettingBlockNumber(PinBoxFut<'a, U64>, Option<TransactionReceipt>),
351
352    /// Future has completed and should panic if polled again
353    Completed,
354}
355
356impl<'a> fmt::Debug for PendingTxState<'a> {
357    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358        let state = match self {
359            PendingTxState::InitialDelay(_) => "InitialDelay",
360            PendingTxState::PausedGettingTx => "PausedGettingTx",
361            PendingTxState::GettingTx(_) => "GettingTx",
362            PendingTxState::PausedGettingReceipt => "PausedGettingReceipt",
363            PendingTxState::GettingReceipt(_) => "GettingReceipt",
364            PendingTxState::GettingBlockNumber(_, _) => "GettingBlockNumber",
365            PendingTxState::PausedGettingBlockNumber(_) => "PausedGettingBlockNumber",
366            PendingTxState::CheckingReceipt(_) => "CheckingReceipt",
367            PendingTxState::Completed => "Completed",
368        };
369
370        f.debug_struct("PendingTxState").field("state", &state).finish()
371    }
372}