1use async_recursion::async_recursion;
2use ethers::{
3 providers::Middleware,
4 types::{
5 transaction::eip2718::TypedTransaction, Address, BlockId, BlockNumber, Bytes,
6 NameOrAddress, TransactionReceipt, H256, U256,
7 },
8};
9
10use std::default::Default;
11use std::fmt::Debug;
12use std::time::{Duration, Instant};
13use tracing::{error, info, trace, warn};
14
15use crate::gas_oracle::{GasInfo, GasOracle, GasOracleInfo, LegacyGasInfo};
16use crate::time::{DefaultTime, Time};
17use crate::transaction::{PersistentState, Priority, StaticTxData, SubmittedTxs, Transaction};
18use crate::{database::Database, gas_oracle::EIP1559GasInfo};
19
20const TRANSACTION_MINING_TIME: Duration = Duration::from_secs(60);
22const BLOCK_TIME: Duration = Duration::from_secs(20);
23
24#[derive(Debug, thiserror::Error)]
29pub enum Error<M: Middleware, GO: GasOracle, DB: Database> {
30 #[error("middleware: {0}")]
31 Middleware(M::Error),
32
33 #[error("database: {0}")]
34 Database(DB::Error),
35
36 #[error("gas oracle: error1 = ({0}), error2 = ({1})")]
37 GasOracle(GO::Error, M::Error),
38
39 #[error("nonce too low (expected: {expected_nonce}, current: {current_nonce})")]
40 NonceTooLow {
41 current_nonce: U256,
42 expected_nonce: U256,
43 },
44
45 #[error("internal error: latest block is none")]
46 LatestBlockIsNone,
47
48 #[error("internal error: latest base fee is none")]
49 LatestBaseFeeIsNone,
50
51 #[error("internal error: incompatible gas oracle ({0})")]
52 IncompatibleGasOracle(&'static str),
53}
54
55#[derive(Clone, Debug)]
60pub struct Configuration<T: Time> {
61 pub transaction_mining_time: Duration,
64
65 pub block_time: Duration,
69
70 pub time: T,
72}
73
74impl<T: Time> Configuration<T> {
75 pub fn set_transaction_mining_time(
76 mut self,
77 transaction_mining_time: Duration,
78 ) -> Configuration<T> {
79 self.transaction_mining_time = transaction_mining_time;
80 self
81 }
82
83 pub fn set_block_time(mut self, block_time: Duration) -> Configuration<T> {
84 self.block_time = block_time;
85 self
86 }
87
88 pub fn set_time(mut self, time: T) -> Configuration<T> {
89 self.time = time;
90 self
91 }
92}
93
94impl Default for Configuration<DefaultTime> {
95 fn default() -> Self {
96 Self {
97 transaction_mining_time: TRANSACTION_MINING_TIME,
98 block_time: BLOCK_TIME,
99 time: DefaultTime,
100 }
101 }
102}
103
104#[derive(Copy, Clone, Debug)]
109pub struct Chain {
110 pub id: u64,
111 pub is_legacy: bool,
112}
113
114impl Chain {
115 pub fn new(id: u64) -> Chain {
117 Self {
118 id,
119 is_legacy: false,
120 }
121 }
122
123 pub fn legacy(id: u64) -> Chain {
125 Self {
126 id,
127 is_legacy: true,
128 }
129 }
130}
131
132#[derive(Debug)]
137pub struct Manager<M: Middleware, GO: GasOracle, DB: Database, T: Time> {
138 provider: M,
139 gas_oracle: GO,
140 db: DB,
141 chain: Chain,
142 configuration: Configuration<T>,
143}
144
145impl<M: Middleware, GO: GasOracle, DB: Database, T: Time> Manager<M, GO, DB, T>
147where
148 M: Send + Sync,
149 GO: Send + Sync,
150 DB: Send + Sync,
151 T: Send + Sync,
152{
153 #[tracing::instrument(level = "trace", skip_all)]
157 pub async fn new(
158 provider: M,
159 gas_oracle: GO,
160 db: DB,
161 chain: Chain,
162 configuration: Configuration<T>,
163 ) -> Result<(Self, Option<TransactionReceipt>), Error<M, GO, DB>> {
164 let mut manager = Self {
165 provider,
166 gas_oracle,
167 db,
168 chain,
169 configuration,
170 };
171
172 trace!("Instantiating a new transaction manager => {:#?}", manager);
173
174 let transaction_receipt = match manager.db.get_state().await.map_err(Error::Database)? {
175 Some(mut state) => {
176 warn!("Dealing with previous state => {:#?}", state);
177
178 {
179 let current_nonce = manager.get_nonce(state.tx_data.transaction.from).await?;
180 let expected_nonce = state.tx_data.nonce;
181
182 if current_nonce > expected_nonce {
183 error!(
184 "Nonce too low! Current is `{}`, expected `{}`",
185 current_nonce, expected_nonce
186 );
187
188 return Err(Error::NonceTooLow {
189 current_nonce,
190 expected_nonce,
191 });
192 }
193 }
194
195 let wait_time = manager.get_wait_time(state.tx_data.confirmations, None);
196 let transaction_receipt = manager
197 .confirm_transaction(&mut state, wait_time, false)
198 .await?;
199 manager.db.clear_state().await.map_err(Error::Database)?;
200 Some(transaction_receipt)
201 }
202
203 None => None,
204 };
205
206 Ok((manager, transaction_receipt))
207 }
208
209 #[tracing::instrument(level = "trace", skip_all)]
210 pub async fn force_new(
211 provider: M,
212 gas_oracle: GO,
213 db: DB,
214 chain: Chain,
215 configuration: Configuration<T>,
216 ) -> Result<Self, Error<M, GO, DB>> {
217 let mut manager = Self {
218 provider,
219 gas_oracle,
220 db,
221 chain,
222 configuration,
223 };
224
225 trace!(
226 "Forcing the instantiation of a new transaction manager => {:#?}",
227 manager
228 );
229
230 trace!("Clearing DB state");
231 manager.db.clear_state().await.map_err(Error::Database)?;
232
233 Ok(manager)
234 }
235
236 #[tracing::instrument(level = "trace", skip_all)]
238 pub async fn send_transaction(
239 mut self,
240 transaction: Transaction,
241 confirmations: usize,
242 priority: Priority,
243 ) -> Result<(Self, TransactionReceipt), Error<M, GO, DB>> {
244 trace!("Sending the transaction.");
245
246 let mut state = {
247 let nonce = self.get_nonce(transaction.from).await?;
248
249 let tx_data = StaticTxData {
250 transaction,
251 nonce,
252 confirmations,
253 priority,
254 };
255
256 let submitted_txs = SubmittedTxs::new();
257
258 PersistentState {
259 tx_data,
260 submitted_txs,
261 }
262 };
263
264 let receipt = self.send_then_confirm_transaction(&mut state).await?;
265
266 info!(
267 "Transaction with nonce {:?} was sent. Transaction hash = {:?}.",
268 state.tx_data.nonce, receipt.transaction_hash
269 );
270
271 self.db.clear_state().await.map_err(Error::Database)?;
273
274 Ok((self, receipt))
275 }
276}
277
278impl<M: Middleware, GO: GasOracle, DB: Database, T: Time> Manager<M, GO, DB, T>
279where
280 M: Send + Sync,
281 GO: Send + Sync,
282 DB: Send + Sync,
283 T: Send + Sync,
284{
285 #[async_recursion]
286 #[tracing::instrument(level = "trace", skip_all)]
287 async fn send_then_confirm_transaction(
288 &mut self,
289 state: &mut PersistentState,
290 ) -> Result<TransactionReceipt, Error<M, GO, DB>> {
291 trace!("(Re)sending the transaction.");
292
293 let gas_oracle_info = self.get_gas_oracle_info(state.tx_data.priority).await?;
295
296 if let Some(block_time) = gas_oracle_info.block_time {
298 self.configuration.block_time = block_time;
299 }
300 let wait_time =
301 self.get_wait_time(state.tx_data.confirmations, gas_oracle_info.mining_time);
302
303 let typed_transaction: TypedTransaction = {
305 let mut typed_transaction = state
306 .tx_data
307 .to_typed_transaction(&self.chain, gas_oracle_info.gas_info);
308
309 typed_transaction.set_gas(
312 self.provider
313 .estimate_gas(&typed_transaction, None)
314 .await
315 .map_err(Error::Middleware)?,
316 );
317
318 typed_transaction
319 };
320
321 {
322 let (transaction_hash, raw_transaction) =
324 self.raw_transaction(&typed_transaction).await?;
325
326 if !state.submitted_txs.contains(transaction_hash) {
328 state.submitted_txs.add(transaction_hash);
330 self.db.set_state(state).await.map_err(Error::Database)?;
331 }
332
333 let result = self
335 .provider
336 .send_raw_transaction(raw_transaction)
337 .await
338 .map_err(Error::Middleware);
339
340 match result {
341 Ok(pending_transaction) => {
342 assert_eq!(
343 transaction_hash,
344 H256(*pending_transaction.as_fixed_bytes()),
345 "stored hash is different from the pending transaction's hash"
346 );
347 info!(
348 "The manager has submitted transaction with hash {:?} \
349 to the transaction pool, for a total of {:?} submitted \
350 transaction(s).",
351 transaction_hash,
352 state.submitted_txs.len()
353 );
354 }
355 Err(err) => {
356 if is_error(&err, "replacement transaction underpriced") {
357 assert!(!state.submitted_txs.is_empty());
358 warn!("Tried to send an underpriced transaction.");
359 } else if is_error(&err, "already known") {
361 assert!(!state.submitted_txs.is_empty());
362 warn!("Tried to send an already known transaction.");
363 } else {
365 error!("Error while submitting transaction: {:?}", err);
366 return Err(err);
367 }
368 }
369 };
370 };
371
372 self.confirm_transaction(state, wait_time, true).await
374 }
375
376 #[tracing::instrument(level = "trace", skip_all)]
377 async fn confirm_transaction(
378 &mut self,
379 state: &mut PersistentState,
380 wait_time: Duration,
381 sleep_first: bool,
382 ) -> Result<TransactionReceipt, Error<M, GO, DB>> {
383 trace!(
384 "Confirming transaction (nonce = {:?}).",
385 state.tx_data.nonce
386 );
387
388 let start_time = Instant::now();
389 let mut sleep_time = if sleep_first {
390 self.configuration.block_time
391 } else {
392 Duration::ZERO
393 };
394
395 loop {
396 self.configuration.time.sleep(sleep_time).await;
398
399 trace!("Were any of the transactions mined?");
401 let receipt = self.get_mined_transaction(state).await?;
402
403 match receipt {
404 Some(receipt) => {
405 let transaction_block = receipt.block_number.unwrap().as_usize();
406 let current_block = self
407 .provider
408 .get_block_number()
409 .await
410 .map_err(Error::Middleware)?
411 .as_usize();
412
413 trace!("Mined transaction block: {:?}.", transaction_block);
414 trace!("Current block: {:?}.", current_block);
415
416 assert!(current_block >= transaction_block);
418 let mut delta = (current_block - transaction_block) as i32;
419 delta = (state.tx_data.confirmations as i32) - delta;
420 trace!("{:?} more confirmation(s) required.", delta);
421 if delta <= 0 {
422 return Ok(receipt);
423 }
424 }
425 None => {
426 trace!("No transaction mined.");
427
428 let elapsed_time = self.configuration.time.elapsed(start_time);
430 if elapsed_time > wait_time {
431 trace!(
432 "I have waited too much! (elapsed = {:?}, max = {:?})",
433 elapsed_time,
434 wait_time
435 );
436 return self.send_then_confirm_transaction(state).await;
437 }
438 }
439 }
440
441 sleep_time = self.configuration.block_time;
442 }
443 }
444
445 #[tracing::instrument(level = "trace", skip_all)]
448 async fn get_provider_gas_oracle_info(&self) -> Result<GasOracleInfo, M::Error> {
449 let gas_info = if self.chain.is_legacy {
450 trace!("Calculating legacy gas price using the provider.");
451 let gas_price = self.provider.get_gas_price().await?;
452 trace!("(gas_price = {:?} wei)", gas_price);
453 GasInfo::Legacy(LegacyGasInfo { gas_price })
454 } else {
455 trace!("Estimating EIP1559 fees with the provider.");
456 let (max_fee, max_priority_fee) = self.provider.estimate_eip1559_fees(None).await?;
457 trace!(
458 "(max_fee = {:?}, max_priority_fee = {:?})",
459 max_fee,
460 max_priority_fee
461 );
462 GasInfo::EIP1559(EIP1559GasInfo {
463 max_fee,
464 max_priority_fee: Some(max_priority_fee),
465 })
466 };
467 Ok(GasOracleInfo {
468 gas_info,
469 mining_time: None,
470 block_time: None,
471 })
472 }
473
474 async fn get_max_priority_fee(&self, max_fee: U256) -> Result<U256, Error<M, GO, DB>> {
476 let base_fee = self
477 .provider
478 .get_block(BlockId::Number(BlockNumber::Latest))
479 .await
480 .map_err(Error::Middleware)?
481 .ok_or(Error::LatestBlockIsNone)?
482 .base_fee_per_gas
483 .ok_or(Error::LatestBaseFeeIsNone)?;
484
485 assert!(
486 max_fee > base_fee,
487 "max_fee({:?}) <= base_fee({:?})",
488 max_fee,
489 base_fee
490 );
491
492 Ok(max_fee - base_fee)
493 }
494
495 #[tracing::instrument(level = "trace", skip_all)]
498 async fn get_gas_oracle_info(
499 &self,
500 priority: Priority,
501 ) -> Result<GasOracleInfo, Error<M, GO, DB>> {
502 match self.gas_oracle.get_info(priority).await {
503 Ok(mut gas_oracle_info) => {
504 assert_eq!(gas_oracle_info.gas_info.is_legacy(), self.chain.is_legacy);
505
506 if let GasInfo::EIP1559(mut eip1559_gas_info) = gas_oracle_info.gas_info {
507 if eip1559_gas_info.max_priority_fee.is_none() {
508 eip1559_gas_info.max_priority_fee =
509 Some(self.get_max_priority_fee(eip1559_gas_info.max_fee).await?);
510 gas_oracle_info.gas_info = GasInfo::EIP1559(eip1559_gas_info);
511 };
512 }
513
514 Ok(gas_oracle_info)
515 }
516 Err(err1) => {
517 trace!(
518 "Gas oracle has failed and/or is defaulting to the provider ({}).",
519 err1.to_string()
520 );
521 self.get_provider_gas_oracle_info()
522 .await
523 .map_err(|err2| Error::GasOracle(err1, err2))
524 }
525 }
526 }
527
528 #[tracing::instrument(level = "trace", skip_all)]
529 async fn get_mined_transaction(
530 &self,
531 state: &mut PersistentState,
532 ) -> Result<Option<TransactionReceipt>, Error<M, GO, DB>> {
533 for &hash in &state.submitted_txs {
534 if let Some(receipt) = self
535 .provider
536 .get_transaction_receipt(hash)
537 .await
538 .map_err(Error::Middleware)?
539 {
540 return Ok(Some(receipt));
541 }
542 }
543 Ok(None)
544 }
545
546 #[tracing::instrument(level = "trace", skip_all)]
547 async fn get_nonce(&self, address: Address) -> Result<U256, Error<M, GO, DB>> {
548 self.provider
549 .get_transaction_count(
550 NameOrAddress::Address(address),
551 Some(BlockId::Number(BlockNumber::Pending)),
552 )
553 .await
554 .map_err(Error::Middleware)
555 }
556
557 #[tracing::instrument(level = "trace", skip_all)]
559 async fn raw_transaction(
560 &self,
561 typed_transaction: &TypedTransaction,
562 ) -> Result<(H256, Bytes), Error<M, GO, DB>> {
563 let from = *typed_transaction.from().unwrap();
564 let signature = self
565 .provider
566 .sign_transaction(typed_transaction, from)
567 .await
568 .map_err(Error::Middleware)?;
569 let hash = typed_transaction.hash(&signature);
570 let rlp_data = typed_transaction.rlp_signed(&signature);
571 Ok((hash, rlp_data))
572 }
573
574 #[tracing::instrument(level = "trace", skip_all)]
576 fn get_wait_time(
577 &self,
578 confirmations: usize,
579 transaction_mining_time: Option<Duration>,
580 ) -> Duration {
581 let transaction_mining_time =
582 transaction_mining_time.unwrap_or(self.configuration.transaction_mining_time);
583 let confirmation_time = if confirmations > 0 {
584 confirmations as u32
585 } else {
586 1
587 } * self.configuration.block_time;
588 transaction_mining_time + confirmation_time
589 }
590}
591
592fn is_error<E>(err: &E, s: &str) -> bool
593where
594 E: Debug,
595{
596 format!("{:?}", err).contains(s)
597}