1use crate::{
2 containers::{
3 dependency::Dependency,
4 price_sort::PriceSort,
5 },
6 service::TxStatusChange,
7 types::*,
8 Config,
9 Error,
10};
11use anyhow::anyhow;
12use fuel_core_interfaces::{
13 common::fuel_tx::{
14 Chargeable,
15 CheckedTransaction,
16 IntoChecked,
17 Transaction,
18 UniqueIdentifier,
19 },
20 model::{
21 ArcPoolTx,
22 FuelBlock,
23 TxInfo,
24 },
25 txpool::{
26 InsertionResult,
27 TxPoolDb,
28 },
29};
30use fuel_metrics::txpool_metrics::TXPOOL_METRICS;
31use std::{
32 cmp::Reverse,
33 collections::HashMap,
34 ops::Deref,
35 sync::Arc,
36};
37use tokio::sync::RwLock;
38
39#[derive(Debug, Clone)]
40pub struct TxPool {
41 by_hash: HashMap<TxId, TxInfo>,
42 by_gas_price: PriceSort,
43 by_dependency: Dependency,
44 config: Config,
45}
46
47impl TxPool {
48 pub fn new(config: Config) -> Self {
49 let max_depth = config.max_depth;
50
51 Self {
52 by_hash: HashMap::new(),
53 by_gas_price: PriceSort::default(),
54 by_dependency: Dependency::new(max_depth, config.utxo_validation),
55 config,
56 }
57 }
58 pub fn txs(&self) -> &HashMap<TxId, TxInfo> {
59 &self.by_hash
60 }
61
62 pub fn dependency(&self) -> &Dependency {
63 &self.by_dependency
64 }
65
66 async fn insert_inner(
68 &mut self,
69 tx: Arc<Transaction>,
71 db: &dyn TxPoolDb,
72 ) -> anyhow::Result<InsertionResult> {
73 let current_height = db.current_block_height()?;
74
75 if tx.is_mint() {
76 return Err(Error::NotSupportedTransactionType.into())
77 }
78
79 self.verify_tx_min_gas_price(&tx)?;
81
82 let tx: CheckedTransaction = if self.config.utxo_validation {
83 tx.deref()
84 .clone()
85 .into_checked(
86 current_height.into(),
87 &self.config.chain_config.transaction_parameters,
88 )?
89 .into()
90 } else {
91 tx.deref()
92 .clone()
93 .into_checked_basic(
94 current_height.into(),
95 &self.config.chain_config.transaction_parameters,
96 )?
97 .into()
98 };
99
100 let tx = Arc::new(match tx {
101 CheckedTransaction::Script(script) => PoolTransaction::Script(script),
102 CheckedTransaction::Create(create) => PoolTransaction::Create(create),
103 CheckedTransaction::Mint(_) => unreachable!(),
104 });
105
106 if !tx.is_computed() {
107 return Err(Error::NoMetadata.into())
108 }
109
110 if tx.max_gas() > self.config.chain_config.block_gas_limit {
112 return Err(Error::NotInsertedMaxGasLimit {
113 tx_gas: tx.max_gas(),
114 block_limit: self.config.chain_config.block_gas_limit,
115 }
116 .into())
117 }
118
119 if !tx.check_predicates(self.config.chain_config.transaction_parameters) {
121 return Err(anyhow!("transaction predicate verification failed"))
122 }
123
124 if self.by_hash.contains_key(&tx.id()) {
125 return Err(Error::NotInsertedTxKnown.into())
126 }
127
128 let mut max_limit_hit = false;
129 if self.by_hash.len() >= self.config.max_tx {
131 max_limit_hit = true;
132 let lowest_price = self.by_gas_price.lowest_price();
134 if lowest_price >= tx.price() {
135 return Err(Error::NotInsertedLimitHit.into())
136 }
137 }
138 if self.config.metrics {
139 TXPOOL_METRICS
140 .gas_price_histogram
141 .observe(tx.price() as f64);
142
143 TXPOOL_METRICS
144 .tx_size_histogram
145 .observe(tx.metered_bytes_size() as f64);
146 }
147 let rem = self.by_dependency.insert(&self.by_hash, db, &tx).await?;
149 self.by_hash.insert(tx.id(), TxInfo::new(tx.clone()));
150 self.by_gas_price.insert(&tx);
151
152 let removed = if rem.is_empty() {
154 if max_limit_hit {
155 let rem_tx = self.by_gas_price.last().unwrap(); self.remove_inner(&rem_tx);
158 vec![rem_tx]
159 } else {
160 Vec::new()
161 }
162 } else {
163 for rem in rem.iter() {
165 self.by_hash
166 .remove(&rem.id())
167 .expect("Expect to hash of tx to be present");
168 self.by_gas_price.remove(rem);
169 }
170
171 rem
172 };
173
174 Ok(InsertionResult {
175 inserted: tx,
176 removed,
177 })
178 }
179
180 pub fn sorted_includable(&self) -> Vec<ArcPoolTx> {
182 self.by_gas_price
183 .sort
184 .iter()
185 .rev()
186 .map(|(_, tx)| tx.clone())
187 .collect()
188 }
189
190 pub fn remove_inner(&mut self, tx: &ArcPoolTx) -> Vec<ArcPoolTx> {
191 self.remove_by_tx_id(&tx.id())
192 }
193
194 pub fn remove_by_tx_id(&mut self, tx_id: &TxId) -> Vec<ArcPoolTx> {
196 if let Some(tx) = self.by_hash.remove(tx_id) {
197 let removed = self
198 .by_dependency
199 .recursively_remove_all_dependencies(&self.by_hash, tx.tx().clone());
200 for remove in removed.iter() {
201 self.by_gas_price.remove(remove);
202 self.by_hash.remove(&remove.id());
203 }
204 return removed
205 }
206 Vec::new()
207 }
208
209 fn verify_tx_min_gas_price(&mut self, tx: &Transaction) -> Result<(), Error> {
210 let price = match tx {
211 Transaction::Script(script) => script.price(),
212 Transaction::Create(create) => create.price(),
213 Transaction::Mint(_) => unreachable!(),
214 };
215 if self.config.metrics {
216 TXPOOL_METRICS.gas_price_histogram.observe(price as f64);
220 }
221 if price < self.config.min_gas_price {
222 return Err(Error::NotInsertedGasPriceTooLow)
223 }
224 Ok(())
225 }
226
227 pub async fn insert(
229 txpool: &RwLock<Self>,
230 db: &dyn TxPoolDb,
231 tx_status_sender: &TxStatusChange,
232 txs: &[Arc<Transaction>],
233 ) -> Vec<anyhow::Result<InsertionResult>> {
234 let mut res = Vec::new();
237 for tx in txs.iter() {
238 let mut pool = txpool.write().await;
239 res.push(pool.insert_inner(tx.clone(), db).await)
240 }
241 for ret in res.iter() {
243 match ret {
244 Ok(InsertionResult { removed, inserted }) => {
245 for removed in removed {
246 tx_status_sender.send_squeezed_out(removed.id(), Error::Removed);
249 }
250 tx_status_sender.send_submitted(inserted.id());
251 }
252 Err(_) => {
253 }
255 }
256 }
257 res
258 }
259
260 pub async fn find(txpool: &RwLock<Self>, hashes: &[TxId]) -> Vec<Option<TxInfo>> {
262 let mut res = Vec::with_capacity(hashes.len());
263 let pool = txpool.read().await;
264 for hash in hashes {
265 res.push(pool.txs().get(hash).cloned());
266 }
267 res
268 }
269
270 pub async fn find_one(txpool: &RwLock<Self>, hash: &TxId) -> Option<TxInfo> {
271 txpool.read().await.txs().get(hash).cloned()
272 }
273
274 pub async fn find_dependent(
276 txpool: &RwLock<Self>,
277 hashes: &[TxId],
278 ) -> Vec<ArcPoolTx> {
279 let mut seen = HashMap::new();
280 {
281 let pool = txpool.read().await;
282 for hash in hashes {
283 if let Some(tx) = pool.txs().get(hash) {
284 pool.dependency().find_dependent(
285 tx.tx().clone(),
286 &mut seen,
287 pool.txs(),
288 );
289 }
290 }
291 }
292 let mut list: Vec<ArcPoolTx> = seen.into_iter().map(|(_, tx)| tx).collect();
293 list.sort_by_key(|tx| Reverse(tx.price()));
295
296 list
297 }
298
299 pub async fn filter_by_negative(txpool: &RwLock<Self>, tx_ids: &[TxId]) -> Vec<TxId> {
301 let mut res = Vec::new();
302 let pool = txpool.read().await;
303 for tx_id in tx_ids {
304 if pool.txs().get(tx_id).is_none() {
305 res.push(*tx_id)
306 }
307 }
308 res
309 }
310
311 pub async fn pending_number(txpool: &RwLock<Self>) -> usize {
313 let pool = txpool.read().await;
314 pool.by_hash.len()
315 }
316
317 pub async fn consumable_gas(txpool: &RwLock<Self>) -> u64 {
319 let pool = txpool.read().await;
320 pool.by_hash.values().map(|tx| tx.limit()).sum()
321 }
322
323 pub async fn includable(txpool: &RwLock<Self>) -> Vec<ArcPoolTx> {
326 let pool = txpool.read().await;
327 pool.sorted_includable()
328 }
329
330 pub async fn block_update(
332 txpool: &RwLock<Self>,
333 tx_status_sender: &TxStatusChange,
334 block: Arc<FuelBlock>,
335 ) {
337 let mut guard = txpool.write().await;
338 for tx in block.transactions() {
341 tx_status_sender.send_complete(tx.id());
342 let _removed = guard.remove_by_tx_id(&tx.id());
343 }
344 }
345
346 pub async fn remove(
348 txpool: &RwLock<Self>,
349 tx_status_sender: &TxStatusChange,
350 tx_ids: &[TxId],
351 ) -> Vec<ArcPoolTx> {
352 let mut removed = Vec::new();
353 for tx_id in tx_ids {
354 let rem = { txpool.write().await.remove_by_tx_id(tx_id) };
355 tx_status_sender.send_squeezed_out(*tx_id, Error::Removed);
356 removed.extend(rem.into_iter());
357 }
358 removed
359 }
360}
361
362#[cfg(test)]
363mod test_helpers;
364#[cfg(test)]
365mod tests;