ethers_providers/toolbox/
pending_escalator.rs1#![allow(clippy::return_self_not_must_use)]
2
3use ethers_core::types::{Bytes, TransactionReceipt, H256};
4use futures_timer::Delay;
5use futures_util::{stream::FuturesUnordered, StreamExt};
6use instant::{Duration, Instant};
7use pin_project::pin_project;
8use std::{future::Future, pin::Pin, task::Poll};
9
10use crate::{
11 utils::PinBoxFut, JsonRpcClient, Middleware, PendingTransaction, Provider, ProviderError,
12};
13
14enum EscalatorStates<'a, P> {
16 Initial(PinBoxFut<'a, PendingTransaction<'a, P>>),
17 Sleeping(Pin<Box<Delay>>),
18 BroadcastingNew(PinBoxFut<'a, PendingTransaction<'a, P>>),
19 CheckingReceipts(FuturesUnordered<PinBoxFut<'a, Option<TransactionReceipt>>>),
20 Completed,
21}
22
23#[must_use]
26#[pin_project(project = PendingProj)]
27#[derive(Debug)]
28pub struct EscalatingPending<'a, P>
29where
30 P: JsonRpcClient,
31{
32 provider: &'a Provider<P>,
33 broadcast_interval: Duration,
34 polling_interval: Duration,
35 txns: Vec<Bytes>,
36 last: Instant,
37 sent: Vec<H256>,
38 state: EscalatorStates<'a, P>,
39}
40
41impl<'a, P> EscalatingPending<'a, P>
42where
43 P: JsonRpcClient,
44{
45 pub(crate) fn new(provider: &'a Provider<P>, mut txns: Vec<Bytes>) -> Self {
53 if txns.is_empty() {
54 panic!("bad args");
55 }
56
57 let first = txns.pop().expect("bad args");
58 Self {
60 provider,
61 broadcast_interval: Duration::from_millis(150),
62 polling_interval: Duration::from_millis(10),
63 txns,
64 last: Instant::now(),
67 sent: vec![],
68 state: EscalatorStates::Initial(Box::pin(provider.send_raw_transaction(first))),
69 }
70 }
71
72 pub fn with_broadcast_interval(mut self, duration: impl Into<Duration>) -> Self {
75 self.broadcast_interval = duration.into();
76 self
77 }
78
79 pub fn with_polling_interval(mut self, duration: impl Into<Duration>) -> Self {
82 self.polling_interval = duration.into();
83 self
84 }
85
86 pub fn get_polling_interval(&self) -> Duration {
88 self.polling_interval
89 }
90
91 pub fn get_broadcast_interval(&self) -> Duration {
93 self.broadcast_interval
94 }
95}
96
97macro_rules! check_all_receipts {
98 ($cx:ident, $this:ident) => {
99 let futs: futures_util::stream::FuturesUnordered<_> = $this
100 .sent
101 .iter()
102 .map(|tx_hash| $this.provider.get_transaction_receipt(*tx_hash))
103 .collect();
104 *$this.state = CheckingReceipts(futs);
105 $cx.waker().wake_by_ref();
106 return Poll::Pending
107 };
108}
109
110macro_rules! sleep {
111 ($cx:ident, $this:ident) => {
112 *$this.state = EscalatorStates::Sleeping(Box::pin(Delay::new(*$this.polling_interval)));
113 $cx.waker().wake_by_ref();
114 return Poll::Pending
115 };
116}
117
118macro_rules! completed {
119 ($this:ident, $output:expr) => {
120 *$this.state = Completed;
121 return Poll::Ready($output)
122 };
123}
124
125fn is_nonce_too_low(e: &ProviderError) -> bool {
127 let debug_str = format!("{e:?}");
128
129 debug_str.contains("nonce too low") || debug_str.contains("nonce is too low") || debug_str.contains("invalid transaction nonce") }
133
134macro_rules! poll_broadcast_fut {
135 ($cx:ident, $this:ident, $fut:ident) => {
136 match $fut.as_mut().poll($cx) {
137 Poll::Ready(Ok(pending)) => {
138 *$this.last = Instant::now();
139 $this.sent.push(*pending);
140 tracing::info!(
141 tx_hash = ?*pending,
142 escalation = $this.sent.len(),
143 "Escalation transaction broadcast complete"
144 );
145 check_all_receipts!($cx, $this);
146 }
147 Poll::Ready(Err(e)) => {
148 if is_nonce_too_low(&e) {
151 check_all_receipts!($cx, $this);
152 } else {
153 tracing::error!(
154 error = ?e,
155 "Error during transaction broadcast"
156 );
157
158 completed!($this, Err(e));
159 }
160 }
161 Poll::Pending => return Poll::Pending,
162 }
163 };
164}
165
166impl<'a, P> Future for EscalatingPending<'a, P>
167where
168 P: JsonRpcClient,
169{
170 type Output = Result<TransactionReceipt, ProviderError>;
171
172 #[cfg_attr(target_arch = "wasm32", allow(unused_must_use))]
173 fn poll(
174 self: std::pin::Pin<&mut Self>,
175 cx: &mut std::task::Context<'_>,
176 ) -> std::task::Poll<Self::Output> {
177 use EscalatorStates::*;
178
179 let this = self.project();
180
181 match this.state {
182 Initial(fut) => {
185 poll_broadcast_fut!(cx, this, fut);
186 }
187 Sleeping(delay) => {
188 futures_util::ready!(delay.as_mut().poll(cx));
189 if this.last.elapsed() > *this.broadcast_interval {
192 if let Some(next_to_broadcast) = this.txns.pop() {
193 let fut = this.provider.send_raw_transaction(next_to_broadcast);
194 *this.state = BroadcastingNew(fut);
195 cx.waker().wake_by_ref();
196 return Poll::Pending
197 }
198 }
199 check_all_receipts!(cx, this);
200 }
201 BroadcastingNew(fut) => {
204 poll_broadcast_fut!(cx, this, fut);
205 }
206 CheckingReceipts(futs) => {
207 match futs.poll_next_unpin(cx) {
211 Poll::Ready(Some(Ok(Some(receipt)))) => {
215 completed!(this, Ok(receipt));
216 }
217 Poll::Ready(Some(Ok(None))) => cx.waker().wake_by_ref(),
221 Poll::Ready(Some(Err(e))) => {
223 completed!(this, Err(e));
224 }
225 Poll::Ready(None) => {
229 sleep!(cx, this);
230 }
231 Poll::Pending => return Poll::Pending,
233 }
234 }
235 Completed => panic!("polled after completion"),
236 }
237
238 Poll::Pending
239 }
240}
241
242impl<'a, P> std::fmt::Debug for EscalatorStates<'a, P> {
243 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244 let state = match self {
245 Self::Initial(_) => "Initial",
246 Self::Sleeping(_) => "Sleeping",
247 Self::BroadcastingNew(_) => "BroadcastingNew",
248 Self::CheckingReceipts(_) => "CheckingReceipts",
249 Self::Completed => "Completed",
250 };
251 f.debug_struct("EscalatorStates").field("state", &state).finish()
252 }
253}