fuel_txpool/
txpool.rs

1use crate::{
2    containers::{
3        dependency::Dependency,
4        price_sort::PriceSort,
5    },
6    service::TxStatusChange,
7    types::*,
8    Config,
9    Error,
10};
11use anyhow::anyhow;
12use fuel_core_interfaces::{
13    common::fuel_tx::{
14        Chargeable,
15        CheckedTransaction,
16        IntoChecked,
17        Transaction,
18        UniqueIdentifier,
19    },
20    model::{
21        ArcPoolTx,
22        FuelBlock,
23        TxInfo,
24    },
25    txpool::{
26        InsertionResult,
27        TxPoolDb,
28    },
29};
30use fuel_metrics::txpool_metrics::TXPOOL_METRICS;
31use std::{
32    cmp::Reverse,
33    collections::HashMap,
34    ops::Deref,
35    sync::Arc,
36};
37use tokio::sync::RwLock;
38
39#[derive(Debug, Clone)]
40pub struct TxPool {
41    by_hash: HashMap<TxId, TxInfo>,
42    by_gas_price: PriceSort,
43    by_dependency: Dependency,
44    config: Config,
45}
46
47impl TxPool {
48    pub fn new(config: Config) -> Self {
49        let max_depth = config.max_depth;
50
51        Self {
52            by_hash: HashMap::new(),
53            by_gas_price: PriceSort::default(),
54            by_dependency: Dependency::new(max_depth, config.utxo_validation),
55            config,
56        }
57    }
58    pub fn txs(&self) -> &HashMap<TxId, TxInfo> {
59        &self.by_hash
60    }
61
62    pub fn dependency(&self) -> &Dependency {
63        &self.by_dependency
64    }
65
66    // this is atomic operation. Return removed(pushed out/replaced) transactions
67    async fn insert_inner(
68        &mut self,
69        // TODO: Pass `&Transaction`
70        tx: Arc<Transaction>,
71        db: &dyn TxPoolDb,
72    ) -> anyhow::Result<InsertionResult> {
73        let current_height = db.current_block_height()?;
74
75        if tx.is_mint() {
76            return Err(Error::NotSupportedTransactionType.into())
77        }
78
79        // verify gas price is at least the minimum
80        self.verify_tx_min_gas_price(&tx)?;
81
82        let tx: CheckedTransaction = if self.config.utxo_validation {
83            tx.deref()
84                .clone()
85                .into_checked(
86                    current_height.into(),
87                    &self.config.chain_config.transaction_parameters,
88                )?
89                .into()
90        } else {
91            tx.deref()
92                .clone()
93                .into_checked_basic(
94                    current_height.into(),
95                    &self.config.chain_config.transaction_parameters,
96                )?
97                .into()
98        };
99
100        let tx = Arc::new(match tx {
101            CheckedTransaction::Script(script) => PoolTransaction::Script(script),
102            CheckedTransaction::Create(create) => PoolTransaction::Create(create),
103            CheckedTransaction::Mint(_) => unreachable!(),
104        });
105
106        if !tx.is_computed() {
107            return Err(Error::NoMetadata.into())
108        }
109
110        // verify max gas is less than block limit
111        if tx.max_gas() > self.config.chain_config.block_gas_limit {
112            return Err(Error::NotInsertedMaxGasLimit {
113                tx_gas: tx.max_gas(),
114                block_limit: self.config.chain_config.block_gas_limit,
115            }
116            .into())
117        }
118
119        // verify predicates
120        if !tx.check_predicates(self.config.chain_config.transaction_parameters) {
121            return Err(anyhow!("transaction predicate verification failed"))
122        }
123
124        if self.by_hash.contains_key(&tx.id()) {
125            return Err(Error::NotInsertedTxKnown.into())
126        }
127
128        let mut max_limit_hit = false;
129        // check if we are hitting limit of pool
130        if self.by_hash.len() >= self.config.max_tx {
131            max_limit_hit = true;
132            // limit is hit, check if we can push out lowest priced tx
133            let lowest_price = self.by_gas_price.lowest_price();
134            if lowest_price >= tx.price() {
135                return Err(Error::NotInsertedLimitHit.into())
136            }
137        }
138        if self.config.metrics {
139            TXPOOL_METRICS
140                .gas_price_histogram
141                .observe(tx.price() as f64);
142
143            TXPOOL_METRICS
144                .tx_size_histogram
145                .observe(tx.metered_bytes_size() as f64);
146        }
147        // check and insert dependency
148        let rem = self.by_dependency.insert(&self.by_hash, db, &tx).await?;
149        self.by_hash.insert(tx.id(), TxInfo::new(tx.clone()));
150        self.by_gas_price.insert(&tx);
151
152        // if some transaction were removed so we don't need to check limit
153        let removed = if rem.is_empty() {
154            if max_limit_hit {
155                // remove last tx from sort
156                let rem_tx = self.by_gas_price.last().unwrap(); // safe to unwrap limit is hit
157                self.remove_inner(&rem_tx);
158                vec![rem_tx]
159            } else {
160                Vec::new()
161            }
162        } else {
163            // remove ret from by_hash and from by_price
164            for rem in rem.iter() {
165                self.by_hash
166                    .remove(&rem.id())
167                    .expect("Expect to hash of tx to be present");
168                self.by_gas_price.remove(rem);
169            }
170
171            rem
172        };
173
174        Ok(InsertionResult {
175            inserted: tx,
176            removed,
177        })
178    }
179
180    /// Return all sorted transactions that are includable in next block.
181    pub fn sorted_includable(&self) -> Vec<ArcPoolTx> {
182        self.by_gas_price
183            .sort
184            .iter()
185            .rev()
186            .map(|(_, tx)| tx.clone())
187            .collect()
188    }
189
190    pub fn remove_inner(&mut self, tx: &ArcPoolTx) -> Vec<ArcPoolTx> {
191        self.remove_by_tx_id(&tx.id())
192    }
193
194    /// remove transaction from pool needed on user demand. Low priority
195    pub fn remove_by_tx_id(&mut self, tx_id: &TxId) -> Vec<ArcPoolTx> {
196        if let Some(tx) = self.by_hash.remove(tx_id) {
197            let removed = self
198                .by_dependency
199                .recursively_remove_all_dependencies(&self.by_hash, tx.tx().clone());
200            for remove in removed.iter() {
201                self.by_gas_price.remove(remove);
202                self.by_hash.remove(&remove.id());
203            }
204            return removed
205        }
206        Vec::new()
207    }
208
209    fn verify_tx_min_gas_price(&mut self, tx: &Transaction) -> Result<(), Error> {
210        let price = match tx {
211            Transaction::Script(script) => script.price(),
212            Transaction::Create(create) => create.price(),
213            Transaction::Mint(_) => unreachable!(),
214        };
215        if self.config.metrics {
216            // Gas Price metrics are recorded here to avoid double matching for
217            // every single transaction, but also means metrics aren't collected on gas
218            // price if there is no minimum gas price
219            TXPOOL_METRICS.gas_price_histogram.observe(price as f64);
220        }
221        if price < self.config.min_gas_price {
222            return Err(Error::NotInsertedGasPriceTooLow)
223        }
224        Ok(())
225    }
226
227    /// Import a set of transactions from network gossip or GraphQL endpoints.
228    pub async fn insert(
229        txpool: &RwLock<Self>,
230        db: &dyn TxPoolDb,
231        tx_status_sender: &TxStatusChange,
232        txs: &[Arc<Transaction>],
233    ) -> Vec<anyhow::Result<InsertionResult>> {
234        // Check if that data is okay (witness match input/output, and if recovered signatures ara valid).
235        // should be done before transaction comes to txpool, or before it enters RwLocked region.
236        let mut res = Vec::new();
237        for tx in txs.iter() {
238            let mut pool = txpool.write().await;
239            res.push(pool.insert_inner(tx.clone(), db).await)
240        }
241        // announce to subscribers
242        for ret in res.iter() {
243            match ret {
244                Ok(InsertionResult { removed, inserted }) => {
245                    for removed in removed {
246                        // small todo there is possibility to have removal reason (ReplacedByHigherGas, DependencyRemoved)
247                        // but for now it is okay to just use Error::Removed.
248                        tx_status_sender.send_squeezed_out(removed.id(), Error::Removed);
249                    }
250                    tx_status_sender.send_submitted(inserted.id());
251                }
252                Err(_) => {
253                    // @dev should not broadcast tx if error occurred
254                }
255            }
256        }
257        res
258    }
259
260    /// find all tx by its hash
261    pub async fn find(txpool: &RwLock<Self>, hashes: &[TxId]) -> Vec<Option<TxInfo>> {
262        let mut res = Vec::with_capacity(hashes.len());
263        let pool = txpool.read().await;
264        for hash in hashes {
265            res.push(pool.txs().get(hash).cloned());
266        }
267        res
268    }
269
270    pub async fn find_one(txpool: &RwLock<Self>, hash: &TxId) -> Option<TxInfo> {
271        txpool.read().await.txs().get(hash).cloned()
272    }
273
274    /// find all dependent tx and return them with requested dependencies in one list sorted by Price.
275    pub async fn find_dependent(
276        txpool: &RwLock<Self>,
277        hashes: &[TxId],
278    ) -> Vec<ArcPoolTx> {
279        let mut seen = HashMap::new();
280        {
281            let pool = txpool.read().await;
282            for hash in hashes {
283                if let Some(tx) = pool.txs().get(hash) {
284                    pool.dependency().find_dependent(
285                        tx.tx().clone(),
286                        &mut seen,
287                        pool.txs(),
288                    );
289                }
290            }
291        }
292        let mut list: Vec<ArcPoolTx> = seen.into_iter().map(|(_, tx)| tx).collect();
293        // sort from high to low price
294        list.sort_by_key(|tx| Reverse(tx.price()));
295
296        list
297    }
298
299    /// Iterate over `hashes` and return all hashes that we don't have.
300    pub async fn filter_by_negative(txpool: &RwLock<Self>, tx_ids: &[TxId]) -> Vec<TxId> {
301        let mut res = Vec::new();
302        let pool = txpool.read().await;
303        for tx_id in tx_ids {
304            if pool.txs().get(tx_id).is_none() {
305                res.push(*tx_id)
306            }
307        }
308        res
309    }
310
311    /// The number of pending transaction in the pool.
312    pub async fn pending_number(txpool: &RwLock<Self>) -> usize {
313        let pool = txpool.read().await;
314        pool.by_hash.len()
315    }
316
317    /// The amount of gas in all includable transactions combined
318    pub async fn consumable_gas(txpool: &RwLock<Self>) -> u64 {
319        let pool = txpool.read().await;
320        pool.by_hash.values().map(|tx| tx.limit()).sum()
321    }
322
323    /// Return all sorted transactions that are includable in next block.
324    /// This is going to be heavy operation, use it only when needed.
325    pub async fn includable(txpool: &RwLock<Self>) -> Vec<ArcPoolTx> {
326        let pool = txpool.read().await;
327        pool.sorted_includable()
328    }
329
330    /// When block is updated we need to receive all spend outputs and remove them from txpool.
331    pub async fn block_update(
332        txpool: &RwLock<Self>,
333        tx_status_sender: &TxStatusChange,
334        block: Arc<FuelBlock>,
335        // spend_outputs: [Input], added_outputs: [AddedOutputs]
336    ) {
337        let mut guard = txpool.write().await;
338        // TODO https://github.com/FuelLabs/fuel-core/issues/465
339
340        for tx in block.transactions() {
341            tx_status_sender.send_complete(tx.id());
342            let _removed = guard.remove_by_tx_id(&tx.id());
343        }
344    }
345
346    /// remove transaction from pool needed on user demand. Low priority
347    pub async fn remove(
348        txpool: &RwLock<Self>,
349        tx_status_sender: &TxStatusChange,
350        tx_ids: &[TxId],
351    ) -> Vec<ArcPoolTx> {
352        let mut removed = Vec::new();
353        for tx_id in tx_ids {
354            let rem = { txpool.write().await.remove_by_tx_id(tx_id) };
355            tx_status_sender.send_squeezed_out(*tx_id, Error::Removed);
356            removed.extend(rem.into_iter());
357        }
358        removed
359    }
360}
361
362#[cfg(test)]
363mod test_helpers;
364#[cfg(test)]
365mod tests;