eth_tx_manager/
manager.rs

1use async_recursion::async_recursion;
2use ethers::{
3    providers::Middleware,
4    types::{
5        transaction::eip2718::TypedTransaction, Address, BlockId, BlockNumber, Bytes,
6        NameOrAddress, TransactionReceipt, H256, U256,
7    },
8};
9
10use std::default::Default;
11use std::fmt::Debug;
12use std::time::{Duration, Instant};
13use tracing::{error, info, trace, warn};
14
15use crate::gas_oracle::{GasInfo, GasOracle, GasOracleInfo, LegacyGasInfo};
16use crate::time::{DefaultTime, Time};
17use crate::transaction::{PersistentState, Priority, StaticTxData, SubmittedTxs, Transaction};
18use crate::{database::Database, gas_oracle::EIP1559GasInfo};
19
20// Default values.
21const TRANSACTION_MINING_TIME: Duration = Duration::from_secs(60);
22const BLOCK_TIME: Duration = Duration::from_secs(20);
23
24// ------------------------------------------------------------------------------------------------
25// Error
26// ------------------------------------------------------------------------------------------------
27
28#[derive(Debug, thiserror::Error)]
29pub enum Error<M: Middleware, GO: GasOracle, DB: Database> {
30    #[error("middleware: {0}")]
31    Middleware(M::Error),
32
33    #[error("database: {0}")]
34    Database(DB::Error),
35
36    #[error("gas oracle: error1 = ({0}), error2 = ({1})")]
37    GasOracle(GO::Error, M::Error),
38
39    #[error("nonce too low (expected: {expected_nonce}, current: {current_nonce})")]
40    NonceTooLow {
41        current_nonce: U256,
42        expected_nonce: U256,
43    },
44
45    #[error("internal error: latest block is none")]
46    LatestBlockIsNone,
47
48    #[error("internal error: latest base fee is none")]
49    LatestBaseFeeIsNone,
50
51    #[error("internal error: incompatible gas oracle ({0})")]
52    IncompatibleGasOracle(&'static str),
53}
54
55// ------------------------------------------------------------------------------------------------
56// Configuration
57// ------------------------------------------------------------------------------------------------
58
59#[derive(Clone, Debug)]
60pub struct Configuration<T: Time> {
61    /// Time it takes for a transaction to be mined by a block after being sent
62    /// to the transaction pool.
63    pub transaction_mining_time: Duration,
64
65    /// Time it takes for a block to be mined. The transaction manager uses this
66    /// value to calculate the polling interval when checking whether the
67    /// transaction was mined.
68    pub block_time: Duration,
69
70    /// Dependency that handles process sleeping and calculating elapsed time.
71    pub time: T,
72}
73
74impl<T: Time> Configuration<T> {
75    pub fn set_transaction_mining_time(
76        mut self,
77        transaction_mining_time: Duration,
78    ) -> Configuration<T> {
79        self.transaction_mining_time = transaction_mining_time;
80        self
81    }
82
83    pub fn set_block_time(mut self, block_time: Duration) -> Configuration<T> {
84        self.block_time = block_time;
85        self
86    }
87
88    pub fn set_time(mut self, time: T) -> Configuration<T> {
89        self.time = time;
90        self
91    }
92}
93
94impl Default for Configuration<DefaultTime> {
95    fn default() -> Self {
96        Self {
97            transaction_mining_time: TRANSACTION_MINING_TIME,
98            block_time: BLOCK_TIME,
99            time: DefaultTime,
100        }
101    }
102}
103
104// ------------------------------------------------------------------------------------------------
105// Chain
106// ------------------------------------------------------------------------------------------------
107
108#[derive(Copy, Clone, Debug)]
109pub struct Chain {
110    pub id: u64,
111    pub is_legacy: bool,
112}
113
114impl Chain {
115    /// For chains that implement the EIP1559.
116    pub fn new(id: u64) -> Chain {
117        Self {
118            id,
119            is_legacy: false,
120        }
121    }
122
123    /// For chains that do not implement the EIP1559.
124    pub fn legacy(id: u64) -> Chain {
125        Self {
126            id,
127            is_legacy: true,
128        }
129    }
130}
131
132// ------------------------------------------------------------------------------------------------
133// Manager
134// ------------------------------------------------------------------------------------------------
135
136#[derive(Debug)]
137pub struct Manager<M: Middleware, GO: GasOracle, DB: Database, T: Time> {
138    provider: M,
139    gas_oracle: GO,
140    db: DB,
141    chain: Chain,
142    configuration: Configuration<T>,
143}
144
145/// Public functions.
146impl<M: Middleware, GO: GasOracle, DB: Database, T: Time> Manager<M, GO, DB, T>
147where
148    M: Send + Sync,
149    GO: Send + Sync,
150    DB: Send + Sync,
151    T: Send + Sync,
152{
153    /// Sends and confirms any pending transaction persisted in the database
154    /// before returning an instance of the transaction manager. In case a
155    /// pending transaction was mined, it's receipt is also returned.
156    #[tracing::instrument(level = "trace", skip_all)]
157    pub async fn new(
158        provider: M,
159        gas_oracle: GO,
160        db: DB,
161        chain: Chain,
162        configuration: Configuration<T>,
163    ) -> Result<(Self, Option<TransactionReceipt>), Error<M, GO, DB>> {
164        let mut manager = Self {
165            provider,
166            gas_oracle,
167            db,
168            chain,
169            configuration,
170        };
171
172        trace!("Instantiating a new transaction manager => {:#?}", manager);
173
174        let transaction_receipt = match manager.db.get_state().await.map_err(Error::Database)? {
175            Some(mut state) => {
176                warn!("Dealing with previous state => {:#?}", state);
177
178                {
179                    let current_nonce = manager.get_nonce(state.tx_data.transaction.from).await?;
180                    let expected_nonce = state.tx_data.nonce;
181
182                    if current_nonce > expected_nonce {
183                        error!(
184                            "Nonce too low! Current is `{}`, expected `{}`",
185                            current_nonce, expected_nonce
186                        );
187
188                        return Err(Error::NonceTooLow {
189                            current_nonce,
190                            expected_nonce,
191                        });
192                    }
193                }
194
195                let wait_time = manager.get_wait_time(state.tx_data.confirmations, None);
196                let transaction_receipt = manager
197                    .confirm_transaction(&mut state, wait_time, false)
198                    .await?;
199                manager.db.clear_state().await.map_err(Error::Database)?;
200                Some(transaction_receipt)
201            }
202
203            None => None,
204        };
205
206        Ok((manager, transaction_receipt))
207    }
208
209    #[tracing::instrument(level = "trace", skip_all)]
210    pub async fn force_new(
211        provider: M,
212        gas_oracle: GO,
213        db: DB,
214        chain: Chain,
215        configuration: Configuration<T>,
216    ) -> Result<Self, Error<M, GO, DB>> {
217        let mut manager = Self {
218            provider,
219            gas_oracle,
220            db,
221            chain,
222            configuration,
223        };
224
225        trace!(
226            "Forcing the instantiation of a new transaction manager => {:#?}",
227            manager
228        );
229
230        trace!("Clearing DB state");
231        manager.db.clear_state().await.map_err(Error::Database)?;
232
233        Ok(manager)
234    }
235
236    /// Sends a transaction and returns the receipt.
237    #[tracing::instrument(level = "trace", skip_all)]
238    pub async fn send_transaction(
239        mut self,
240        transaction: Transaction,
241        confirmations: usize,
242        priority: Priority,
243    ) -> Result<(Self, TransactionReceipt), Error<M, GO, DB>> {
244        trace!("Sending the transaction.");
245
246        let mut state = {
247            let nonce = self.get_nonce(transaction.from).await?;
248
249            let tx_data = StaticTxData {
250                transaction,
251                nonce,
252                confirmations,
253                priority,
254            };
255
256            let submitted_txs = SubmittedTxs::new();
257
258            PersistentState {
259                tx_data,
260                submitted_txs,
261            }
262        };
263
264        let receipt = self.send_then_confirm_transaction(&mut state).await?;
265
266        info!(
267            "Transaction with nonce {:?} was sent. Transaction hash = {:?}.",
268            state.tx_data.nonce, receipt.transaction_hash
269        );
270
271        // Clearing information about the transaction in the database.
272        self.db.clear_state().await.map_err(Error::Database)?;
273
274        Ok((self, receipt))
275    }
276}
277
278impl<M: Middleware, GO: GasOracle, DB: Database, T: Time> Manager<M, GO, DB, T>
279where
280    M: Send + Sync,
281    GO: Send + Sync,
282    DB: Send + Sync,
283    T: Send + Sync,
284{
285    #[async_recursion]
286    #[tracing::instrument(level = "trace", skip_all)]
287    async fn send_then_confirm_transaction(
288        &mut self,
289        state: &mut PersistentState,
290    ) -> Result<TransactionReceipt, Error<M, GO, DB>> {
291        trace!("(Re)sending the transaction.");
292
293        // Estimating gas prices.
294        let gas_oracle_info = self.get_gas_oracle_info(state.tx_data.priority).await?;
295
296        // Overwriting the default block time and calculating the wait time.
297        if let Some(block_time) = gas_oracle_info.block_time {
298            self.configuration.block_time = block_time;
299        }
300        let wait_time =
301            self.get_wait_time(state.tx_data.confirmations, gas_oracle_info.mining_time);
302
303        // Creating the transaction request.
304        let typed_transaction: TypedTransaction = {
305            let mut typed_transaction = state
306                .tx_data
307                .to_typed_transaction(&self.chain, gas_oracle_info.gas_info);
308
309            // Estimating the gas limit of the transaction.
310            // FIXME: "insufficient funds for transfer" is detected here!
311            typed_transaction.set_gas(
312                self.provider
313                    .estimate_gas(&typed_transaction, None)
314                    .await
315                    .map_err(Error::Middleware)?,
316            );
317
318            typed_transaction
319        };
320
321        {
322            // Calculating the transaction hash.
323            let (transaction_hash, raw_transaction) =
324                self.raw_transaction(&typed_transaction).await?;
325
326            // Checking for the "already known" transactions.
327            if !state.submitted_txs.contains(transaction_hash) {
328                // Storing information about the pending transaction in the database.
329                state.submitted_txs.add(transaction_hash);
330                self.db.set_state(state).await.map_err(Error::Database)?;
331            }
332
333            // Sending the transaction.
334            let result = self
335                .provider
336                .send_raw_transaction(raw_transaction)
337                .await
338                .map_err(Error::Middleware);
339
340            match result {
341                Ok(pending_transaction) => {
342                    assert_eq!(
343                        transaction_hash,
344                        H256(*pending_transaction.as_fixed_bytes()),
345                        "stored hash is different from the pending transaction's hash"
346                    );
347                    info!(
348                        "The manager has submitted transaction with hash {:?} \
349                        to the transaction pool, for a total of {:?} submitted \
350                        transaction(s).",
351                        transaction_hash,
352                        state.submitted_txs.len()
353                    );
354                }
355                Err(err) => {
356                    if is_error(&err, "replacement transaction underpriced") {
357                        assert!(!state.submitted_txs.is_empty());
358                        warn!("Tried to send an underpriced transaction.");
359                        /* goes back to confirm_transaction */
360                    } else if is_error(&err, "already known") {
361                        assert!(!state.submitted_txs.is_empty());
362                        warn!("Tried to send an already known transaction.");
363                        /* goes back to confirm_transaction */
364                    } else {
365                        error!("Error while submitting transaction: {:?}", err);
366                        return Err(err);
367                    }
368                }
369            };
370        };
371
372        // Confirming the transaction.
373        self.confirm_transaction(state, wait_time, true).await
374    }
375
376    #[tracing::instrument(level = "trace", skip_all)]
377    async fn confirm_transaction(
378        &mut self,
379        state: &mut PersistentState,
380        wait_time: Duration,
381        sleep_first: bool,
382    ) -> Result<TransactionReceipt, Error<M, GO, DB>> {
383        trace!(
384            "Confirming transaction (nonce = {:?}).",
385            state.tx_data.nonce
386        );
387
388        let start_time = Instant::now();
389        let mut sleep_time = if sleep_first {
390            self.configuration.block_time
391        } else {
392            Duration::ZERO
393        };
394
395        loop {
396            // Sleeping.
397            self.configuration.time.sleep(sleep_time).await;
398
399            // Were any of the transactions mined?
400            trace!("Were any of the transactions mined?");
401            let receipt = self.get_mined_transaction(state).await?;
402
403            match receipt {
404                Some(receipt) => {
405                    let transaction_block = receipt.block_number.unwrap().as_usize();
406                    let current_block = self
407                        .provider
408                        .get_block_number()
409                        .await
410                        .map_err(Error::Middleware)?
411                        .as_usize();
412
413                    trace!("Mined transaction block: {:?}.", transaction_block);
414                    trace!("Current block: {:?}.", current_block);
415
416                    // Are there enough confirmations?
417                    assert!(current_block >= transaction_block);
418                    let mut delta = (current_block - transaction_block) as i32;
419                    delta = (state.tx_data.confirmations as i32) - delta;
420                    trace!("{:?} more confirmation(s) required.", delta);
421                    if delta <= 0 {
422                        return Ok(receipt);
423                    }
424                }
425                None => {
426                    trace!("No transaction mined.");
427
428                    // Have I waited too much?
429                    let elapsed_time = self.configuration.time.elapsed(start_time);
430                    if elapsed_time > wait_time {
431                        trace!(
432                            "I have waited too much! (elapsed = {:?}, max = {:?})",
433                            elapsed_time,
434                            wait_time
435                        );
436                        return self.send_then_confirm_transaction(state).await;
437                    }
438                }
439            }
440
441            sleep_time = self.configuration.block_time;
442        }
443    }
444
445    /// Retrieves the gas_price (legacy) or max_fee and max_priority_fee
446    /// (EIP1559) from the provider and packs it inside GasOracleInfo.
447    #[tracing::instrument(level = "trace", skip_all)]
448    async fn get_provider_gas_oracle_info(&self) -> Result<GasOracleInfo, M::Error> {
449        let gas_info = if self.chain.is_legacy {
450            trace!("Calculating legacy gas price using the provider.");
451            let gas_price = self.provider.get_gas_price().await?;
452            trace!("(gas_price = {:?} wei)", gas_price);
453            GasInfo::Legacy(LegacyGasInfo { gas_price })
454        } else {
455            trace!("Estimating EIP1559 fees with the provider.");
456            let (max_fee, max_priority_fee) = self.provider.estimate_eip1559_fees(None).await?;
457            trace!(
458                "(max_fee = {:?}, max_priority_fee = {:?})",
459                max_fee,
460                max_priority_fee
461            );
462            GasInfo::EIP1559(EIP1559GasInfo {
463                max_fee,
464                max_priority_fee: Some(max_priority_fee),
465            })
466        };
467        Ok(GasOracleInfo {
468            gas_info,
469            mining_time: None,
470            block_time: None,
471        })
472    }
473
474    /// Uses the provider to calculate the max_priority_fee given the max_fee.
475    async fn get_max_priority_fee(&self, max_fee: U256) -> Result<U256, Error<M, GO, DB>> {
476        let base_fee = self
477            .provider
478            .get_block(BlockId::Number(BlockNumber::Latest))
479            .await
480            .map_err(Error::Middleware)?
481            .ok_or(Error::LatestBlockIsNone)?
482            .base_fee_per_gas
483            .ok_or(Error::LatestBaseFeeIsNone)?;
484
485        assert!(
486            max_fee > base_fee,
487            "max_fee({:?}) <= base_fee({:?})",
488            max_fee,
489            base_fee
490        );
491
492        Ok(max_fee - base_fee)
493    }
494
495    /// Retrieves the gas_oracle_info from the gas oracle if there is one, or
496    /// from the provider otherwise.
497    #[tracing::instrument(level = "trace", skip_all)]
498    async fn get_gas_oracle_info(
499        &self,
500        priority: Priority,
501    ) -> Result<GasOracleInfo, Error<M, GO, DB>> {
502        match self.gas_oracle.get_info(priority).await {
503            Ok(mut gas_oracle_info) => {
504                assert_eq!(gas_oracle_info.gas_info.is_legacy(), self.chain.is_legacy);
505
506                if let GasInfo::EIP1559(mut eip1559_gas_info) = gas_oracle_info.gas_info {
507                    if eip1559_gas_info.max_priority_fee.is_none() {
508                        eip1559_gas_info.max_priority_fee =
509                            Some(self.get_max_priority_fee(eip1559_gas_info.max_fee).await?);
510                        gas_oracle_info.gas_info = GasInfo::EIP1559(eip1559_gas_info);
511                    };
512                }
513
514                Ok(gas_oracle_info)
515            }
516            Err(err1) => {
517                trace!(
518                    "Gas oracle has failed and/or is defaulting to the provider ({}).",
519                    err1.to_string()
520                );
521                self.get_provider_gas_oracle_info()
522                    .await
523                    .map_err(|err2| Error::GasOracle(err1, err2))
524            }
525        }
526    }
527
528    #[tracing::instrument(level = "trace", skip_all)]
529    async fn get_mined_transaction(
530        &self,
531        state: &mut PersistentState,
532    ) -> Result<Option<TransactionReceipt>, Error<M, GO, DB>> {
533        for &hash in &state.submitted_txs {
534            if let Some(receipt) = self
535                .provider
536                .get_transaction_receipt(hash)
537                .await
538                .map_err(Error::Middleware)?
539            {
540                return Ok(Some(receipt));
541            }
542        }
543        Ok(None)
544    }
545
546    #[tracing::instrument(level = "trace", skip_all)]
547    async fn get_nonce(&self, address: Address) -> Result<U256, Error<M, GO, DB>> {
548        self.provider
549            .get_transaction_count(
550                NameOrAddress::Address(address),
551                Some(BlockId::Number(BlockNumber::Pending)),
552            )
553            .await
554            .map_err(Error::Middleware)
555    }
556
557    /// Returns the transaction hash and the raw transaction.
558    #[tracing::instrument(level = "trace", skip_all)]
559    async fn raw_transaction(
560        &self,
561        typed_transaction: &TypedTransaction,
562    ) -> Result<(H256, Bytes), Error<M, GO, DB>> {
563        let from = *typed_transaction.from().unwrap();
564        let signature = self
565            .provider
566            .sign_transaction(typed_transaction, from)
567            .await
568            .map_err(Error::Middleware)?;
569        let hash = typed_transaction.hash(&signature);
570        let rlp_data = typed_transaction.rlp_signed(&signature);
571        Ok((hash, rlp_data))
572    }
573
574    /// TODO: docs.
575    #[tracing::instrument(level = "trace", skip_all)]
576    fn get_wait_time(
577        &self,
578        confirmations: usize,
579        transaction_mining_time: Option<Duration>,
580    ) -> Duration {
581        let transaction_mining_time =
582            transaction_mining_time.unwrap_or(self.configuration.transaction_mining_time);
583        let confirmation_time = if confirmations > 0 {
584            confirmations as u32
585        } else {
586            1
587        } * self.configuration.block_time;
588        transaction_mining_time + confirmation_time
589    }
590}
591
592fn is_error<E>(err: &E, s: &str) -> bool
593where
594    E: Debug,
595{
596    format!("{:?}", err).contains(s)
597}