ethers_providers/toolbox/
pending_transaction.rs1use crate::{
2 utils::{interval, PinBoxFut},
3 JsonRpcClient, Middleware, Provider, ProviderError,
4};
5use ethers_core::types::{Transaction, TransactionReceipt, TxHash, U64};
6use futures_core::stream::Stream;
7use futures_timer::Delay;
8use futures_util::stream::StreamExt;
9use instant::Duration;
10use pin_project::pin_project;
11use std::{
12 fmt,
13 future::Future,
14 ops::Deref,
15 pin::Pin,
16 task::{Context, Poll},
17};
18
19#[pin_project]
38pub struct PendingTransaction<'a, P> {
39 tx_hash: TxHash,
40 confirmations: usize,
41 provider: &'a Provider<P>,
42 state: PendingTxState<'a>,
43 interval: Box<dyn Stream<Item = ()> + Send + Unpin>,
44 retries_remaining: usize,
45}
46
47const DEFAULT_RETRIES: usize = 3;
48
49impl<'a, P: JsonRpcClient> PendingTransaction<'a, P> {
50 pub fn new(tx_hash: TxHash, provider: &'a Provider<P>) -> Self {
52 let delay = Box::pin(Delay::new(provider.get_interval()));
53
54 Self {
55 tx_hash,
56 confirmations: 1,
57 provider,
58 state: PendingTxState::InitialDelay(delay),
59 interval: Box::new(interval(provider.get_interval())),
60 retries_remaining: DEFAULT_RETRIES,
61 }
62 }
63
64 pub fn provider(&self) -> Provider<P>
66 where
67 P: Clone,
68 {
69 self.provider.clone()
70 }
71
72 pub fn tx_hash(&self) -> TxHash {
74 self.tx_hash
75 }
76
77 #[must_use]
80 pub fn confirmations(mut self, confs: usize) -> Self {
81 self.confirmations = confs;
82 self
83 }
84
85 #[must_use]
87 pub fn interval<T: Into<Duration>>(mut self, duration: T) -> Self {
88 let duration = duration.into();
89
90 self.interval = Box::new(interval(duration));
91
92 if matches!(self.state, PendingTxState::InitialDelay(_)) {
93 self.state = PendingTxState::InitialDelay(Box::pin(Delay::new(duration)))
94 }
95
96 self
97 }
98
99 #[must_use]
101 pub fn retries(mut self, retries: usize) -> Self {
102 self.retries_remaining = retries;
103 self
104 }
105}
106
107impl<'a, P> PendingTransaction<'a, P> {
108 pub fn inspect<F>(self, mut f: F) -> Self
113 where
114 F: FnMut(&Self),
115 {
116 f(&self);
117 self
118 }
119
120 pub fn log_msg<S: std::fmt::Display>(self, msg: S) -> Self {
122 self.inspect(|s| println!("{msg}: {:?}", **s))
123 }
124
125 pub fn log(self) -> Self {
127 self.inspect(|s| println!("Pending hash: {:?}", **s))
128 }
129}
130
131macro_rules! rewake_with_new_state {
132 ($ctx:ident, $this:ident, $new_state:expr) => {
133 *$this.state = $new_state;
134 $ctx.waker().wake_by_ref();
135 return Poll::Pending
136 };
137}
138
139macro_rules! rewake_with_new_state_if {
140 ($condition:expr, $ctx:ident, $this:ident, $new_state:expr) => {
141 if $condition {
142 rewake_with_new_state!($ctx, $this, $new_state);
143 }
144 };
145}
146
147impl<'a, P: JsonRpcClient> Future for PendingTransaction<'a, P> {
148 type Output = Result<Option<TransactionReceipt>, ProviderError>;
149
150 #[cfg_attr(target_arch = "wasm32", allow(unused_must_use))]
151 fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
152 let this = self.project();
153
154 match this.state {
155 PendingTxState::InitialDelay(fut) => {
156 futures_util::ready!(fut.as_mut().poll(ctx));
157 tracing::debug!("Starting to poll pending tx {:?}", *this.tx_hash);
158 let fut = Box::pin(this.provider.get_transaction(*this.tx_hash));
159 rewake_with_new_state!(ctx, this, PendingTxState::GettingTx(fut));
160 }
161 PendingTxState::PausedGettingTx => {
162 let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
165 let fut = Box::pin(this.provider.get_transaction(*this.tx_hash));
166 *this.state = PendingTxState::GettingTx(fut);
167 ctx.waker().wake_by_ref();
168 }
169 PendingTxState::GettingTx(fut) => {
170 let tx_res = futures_util::ready!(fut.as_mut().poll(ctx));
171 rewake_with_new_state_if!(
174 tx_res.is_err(),
175 ctx,
176 this,
177 PendingTxState::PausedGettingTx
178 );
179
180 let tx_opt = tx_res.unwrap();
181 if tx_opt.is_none() {
183 if *this.retries_remaining == 0 {
184 tracing::debug!("Dropped from mempool, pending tx {:?}", *this.tx_hash);
185 *this.state = PendingTxState::Completed;
186 return Poll::Ready(Ok(None))
187 }
188
189 *this.retries_remaining -= 1;
190 rewake_with_new_state!(ctx, this, PendingTxState::PausedGettingTx);
191 }
192
193 let tx = tx_opt.unwrap();
195 rewake_with_new_state_if!(
196 tx.block_number.is_none(),
197 ctx,
198 this,
199 PendingTxState::PausedGettingTx
200 );
201
202 tracing::debug!("Getting receipt for pending tx {:?}", *this.tx_hash);
204 let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash));
205 rewake_with_new_state!(ctx, this, PendingTxState::GettingReceipt(fut));
206 }
207 PendingTxState::PausedGettingReceipt => {
208 let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
211 let fut = Box::pin(this.provider.get_transaction_receipt(*this.tx_hash));
212 *this.state = PendingTxState::GettingReceipt(fut);
213 ctx.waker().wake_by_ref();
214 }
215 PendingTxState::GettingReceipt(fut) => {
216 if let Ok(receipt) = futures_util::ready!(fut.as_mut().poll(ctx)) {
217 tracing::debug!("Checking receipt for pending tx {:?}", *this.tx_hash);
218 *this.state = PendingTxState::CheckingReceipt(receipt)
219 } else {
220 *this.state = PendingTxState::PausedGettingReceipt
221 }
222 ctx.waker().wake_by_ref();
223 }
224 PendingTxState::CheckingReceipt(receipt) => {
225 rewake_with_new_state_if!(
226 receipt.is_none(),
227 ctx,
228 this,
229 PendingTxState::PausedGettingReceipt
230 );
231
232 if *this.confirmations > 1 {
235 tracing::debug!("Waiting on confirmations for pending tx {:?}", *this.tx_hash);
236
237 let fut = Box::pin(this.provider.get_block_number());
238 *this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());
239
240 ctx.waker().wake_by_ref();
242 } else {
243 let receipt = receipt.take();
244 *this.state = PendingTxState::Completed;
245 return Poll::Ready(Ok(receipt))
246 }
247 }
248 PendingTxState::PausedGettingBlockNumber(receipt) => {
249 let _ready = futures_util::ready!(this.interval.poll_next_unpin(ctx));
252
253 let fut = Box::pin(this.provider.get_block_number());
256 *this.state = PendingTxState::GettingBlockNumber(fut, receipt.take());
257 ctx.waker().wake_by_ref();
258 }
259 PendingTxState::GettingBlockNumber(fut, receipt) => {
260 let current_block = futures_util::ready!(fut.as_mut().poll(ctx))?;
261
262 let receipt = receipt.take().expect("GettingBlockNumber without receipt");
266
267 let inclusion_block = receipt
269 .block_number
270 .expect("Receipt did not have a block number. This should never happen");
271 if current_block > inclusion_block + *this.confirmations - 1 {
274 let receipt = Some(receipt);
275 *this.state = PendingTxState::Completed;
276 return Poll::Ready(Ok(receipt))
277 } else {
278 tracing::trace!(tx_hash = ?this.tx_hash, "confirmations {}/{}", current_block - inclusion_block + 1, this.confirmations);
279 *this.state = PendingTxState::PausedGettingBlockNumber(Some(receipt));
280 ctx.waker().wake_by_ref();
281 }
282 }
283 PendingTxState::Completed => {
284 panic!("polled pending transaction future after completion")
285 }
286 };
287
288 Poll::Pending
289 }
290}
291
292impl<'a, P> fmt::Debug for PendingTransaction<'a, P> {
293 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
294 f.debug_struct("PendingTransaction")
295 .field("tx_hash", &self.tx_hash)
296 .field("confirmations", &self.confirmations)
297 .field("state", &self.state)
298 .finish()
299 }
300}
301
302impl<'a, P> PartialEq for PendingTransaction<'a, P> {
303 fn eq(&self, other: &Self) -> bool {
304 self.tx_hash == other.tx_hash
305 }
306}
307
308impl<'a, P> PartialEq<TxHash> for PendingTransaction<'a, P> {
309 fn eq(&self, other: &TxHash) -> bool {
310 &self.tx_hash == other
311 }
312}
313
314impl<'a, P> Eq for PendingTransaction<'a, P> {}
315
316impl<'a, P> Deref for PendingTransaction<'a, P> {
317 type Target = TxHash;
318
319 fn deref(&self) -> &Self::Target {
320 &self.tx_hash
321 }
322}
323
324enum PendingTxState<'a> {
326 InitialDelay(Pin<Box<Delay>>),
328
329 PausedGettingTx,
331
332 GettingTx(PinBoxFut<'a, Option<Transaction>>),
334
335 PausedGettingReceipt,
337
338 GettingReceipt(PinBoxFut<'a, Option<TransactionReceipt>>),
340
341 CheckingReceipt(Option<TransactionReceipt>),
345
346 PausedGettingBlockNumber(Option<TransactionReceipt>),
348
349 GettingBlockNumber(PinBoxFut<'a, U64>, Option<TransactionReceipt>),
351
352 Completed,
354}
355
356impl<'a> fmt::Debug for PendingTxState<'a> {
357 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358 let state = match self {
359 PendingTxState::InitialDelay(_) => "InitialDelay",
360 PendingTxState::PausedGettingTx => "PausedGettingTx",
361 PendingTxState::GettingTx(_) => "GettingTx",
362 PendingTxState::PausedGettingReceipt => "PausedGettingReceipt",
363 PendingTxState::GettingReceipt(_) => "GettingReceipt",
364 PendingTxState::GettingBlockNumber(_, _) => "GettingBlockNumber",
365 PendingTxState::PausedGettingBlockNumber(_) => "PausedGettingBlockNumber",
366 PendingTxState::CheckingReceipt(_) => "CheckingReceipt",
367 PendingTxState::Completed => "Completed",
368 };
369
370 f.debug_struct("PendingTxState").field("state", &state).finish()
371 }
372}