fuel_core_txpool/
pool.rs

1mod collisions;
2
3use core::num::NonZeroUsize;
4use std::{
5    collections::HashMap,
6    iter,
7    sync::Arc,
8    time::{
9        Instant,
10        SystemTime,
11    },
12};
13
14use collisions::CollisionsExt;
15use fuel_core_metrics::txpool_metrics::txpool_metrics;
16use fuel_core_types::{
17    fuel_tx::{
18        TxId,
19        field::BlobId,
20    },
21    services::{
22        transaction_status::{
23            TransactionStatus,
24            statuses,
25        },
26        txpool::{
27            ArcPoolTx,
28            PoolTransaction,
29        },
30    },
31    tai64::Tai64,
32};
33use num_rational::Ratio;
34
35use crate::{
36    collision_manager::{
37        CollisionManager,
38        Collisions,
39    },
40    config::Config,
41    error::{
42        DependencyError,
43        Error,
44        InputValidationError,
45        InsertionErrorType,
46    },
47    extracted_outputs::ExtractedOutputs,
48    ports::{
49        TxPoolPersistentStorage,
50        TxStatusManager as TxStatusManagerTrait,
51    },
52    selection_algorithms::{
53        Constraints,
54        SelectionAlgorithm,
55    },
56    storage::{
57        CheckedTransaction,
58        Storage,
59        StorageData,
60    },
61};
62
63use crate::{
64    error::RemovedReason,
65    spent_inputs::SpentInputs,
66};
67#[cfg(test)]
68use std::collections::HashSet;
69
70#[derive(Debug, Clone, Copy, Default)]
71pub struct TxPoolStats {
72    pub tx_count: u64,
73    pub total_size: u64,
74    pub total_gas: u64,
75}
76
77/// The pool is the main component of the txpool service. It is responsible for storing transactions
78/// and allowing the selection of transactions for inclusion in a block.
79pub struct Pool<S, SI, CM, SA, TxStatusManager> {
80    /// Configuration of the pool.
81    pub(crate) config: Config,
82    /// The storage of the pool.
83    pub(crate) storage: S,
84    /// The collision manager of the pool.
85    pub(crate) collision_manager: CM,
86    /// The selection algorithm of the pool.
87    pub(crate) selection_algorithm: SA,
88    /// Mapping from tx_id to storage_id.
89    pub(crate) tx_id_to_storage_id: HashMap<TxId, SI>,
90    /// All sent outputs when transactions are extracted. Clear when processing a block.
91    pub(crate) extracted_outputs: ExtractedOutputs,
92    /// The spent inputs cache.
93    pub(crate) spent_inputs: SpentInputs,
94    /// Current pool gas stored.
95    pub(crate) current_gas: u64,
96    /// Current pool size in bytes.
97    pub(crate) current_bytes_size: usize,
98    /// The current pool gas.
99    pub(crate) pool_stats_sender: tokio::sync::watch::Sender<TxPoolStats>,
100    /// New executable transactions notifier.
101    pub(crate) new_executable_txs_notifier: tokio::sync::watch::Sender<()>,
102    /// Transaction status manager.
103    pub(crate) tx_status_manager: Arc<TxStatusManager>,
104}
105
106impl<S, SI, CM, SA, TxStatusManager> Pool<S, SI, CM, SA, TxStatusManager> {
107    /// Create a new pool.
108    pub fn new(
109        storage: S,
110        collision_manager: CM,
111        selection_algorithm: SA,
112        config: Config,
113        pool_stats_sender: tokio::sync::watch::Sender<TxPoolStats>,
114        new_executable_txs_notifier: tokio::sync::watch::Sender<()>,
115        tx_status_manager: Arc<TxStatusManager>,
116    ) -> Self {
117        let capacity = NonZeroUsize::new(config.pool_limits.max_txs.saturating_add(1))
118            .expect("Max txs is greater than 0");
119        let spent_inputs = SpentInputs::new(capacity);
120        Pool {
121            storage,
122            collision_manager,
123            selection_algorithm,
124            config,
125            tx_id_to_storage_id: HashMap::new(),
126            extracted_outputs: ExtractedOutputs::new(),
127            spent_inputs,
128            current_gas: 0,
129            current_bytes_size: 0,
130            pool_stats_sender,
131            new_executable_txs_notifier,
132            tx_status_manager,
133        }
134    }
135
136    /// Returns the number of transactions in the pool.
137    pub fn tx_count(&self) -> usize {
138        self.tx_id_to_storage_id.len()
139    }
140}
141
142impl<S: Storage, CM, SA, TxStatusManager>
143    Pool<S, S::StorageIndex, CM, SA, TxStatusManager>
144where
145    S: Storage,
146    CM: CollisionManager<StorageIndex = S::StorageIndex>,
147    SA: SelectionAlgorithm<Storage = S, StorageIndex = S::StorageIndex>,
148    TxStatusManager: TxStatusManagerTrait,
149{
150    /// Insert transactions into the pool.
151    /// Returns a list of results for each transaction.
152    pub fn insert(
153        &mut self,
154        tx: ArcPoolTx,
155        persistent_storage: &impl TxPoolPersistentStorage,
156    ) -> Result<(), InsertionErrorType> {
157        let tx_id = tx.id();
158        if self.spent_inputs.is_spent_tx(&tx_id)
159            || persistent_storage
160                .contains_tx(&tx_id)
161                .map_err(|e| Error::Database(format!("{:?}", e)))?
162        {
163            return Err(InsertionErrorType::Error(Error::InputValidation(
164                InputValidationError::DuplicateTxId(tx_id),
165            )))
166        }
167
168        let insertion_result = self.insert_inner(tx, persistent_storage);
169        self.register_transaction_counts();
170        insertion_result
171    }
172
173    fn insert_inner(
174        &mut self,
175        tx: Arc<PoolTransaction>,
176        persistent_storage: &impl TxPoolPersistentStorage,
177    ) -> Result<(), InsertionErrorType> {
178        let CanStoreTransaction {
179            checked_transaction,
180            transactions_to_remove,
181            collisions,
182            _guard,
183        } = self.can_insert_transaction(tx, persistent_storage)?;
184
185        let has_dependencies = !checked_transaction.all_dependencies().is_empty();
186
187        let mut removed_transactions = vec![];
188        for tx in transactions_to_remove {
189            let removed = self.storage.remove_transaction_and_dependents_subtree(tx);
190            self.update_components_and_caches_on_removal(removed.iter());
191            removed_transactions.extend(removed);
192        }
193
194        for collided_tx in collisions.keys() {
195            let removed = self
196                .storage
197                .remove_transaction_and_dependents_subtree(*collided_tx);
198            self.update_components_and_caches_on_removal(removed.iter());
199
200            removed_transactions.extend(removed);
201        }
202
203        let tx = checked_transaction.tx();
204        let tx_id = tx.id();
205        let gas = tx.max_gas();
206        let creation_instant = SystemTime::now();
207        let bytes_size = tx.metered_bytes_size();
208
209        let storage_id = self
210            .storage
211            .store_transaction(checked_transaction, creation_instant);
212
213        self.current_gas = self.current_gas.saturating_add(gas);
214        self.current_bytes_size = self.current_bytes_size.saturating_add(bytes_size);
215        debug_assert!(!self.tx_id_to_storage_id.contains_key(&tx_id));
216        self.tx_id_to_storage_id.insert(tx_id, storage_id);
217
218        if self.config.metrics {
219            txpool_metrics().tx_size.observe(bytes_size as f64);
220        };
221
222        let tx =
223            Storage::get(&self.storage, &storage_id).expect("Transaction is set above");
224        self.collision_manager.on_stored_transaction(storage_id, tx);
225
226        let duration = i64::try_from(
227            creation_instant
228                .duration_since(SystemTime::UNIX_EPOCH)
229                .expect("Time can't be less than UNIX EPOCH")
230                .as_secs(),
231        )
232        .expect("Duration is less than i64::MAX");
233        self.tx_status_manager.status_update(
234            tx_id,
235            TransactionStatus::submitted(Tai64::from_unix(duration)),
236        );
237        // No dependencies directly in the graph and the sorted transactions
238        if !has_dependencies {
239            self.selection_algorithm
240                .new_executable_transaction(storage_id, tx);
241            self.new_executable_txs_notifier.send_replace(());
242        }
243
244        let status = statuses::SqueezedOut {
245            reason: Error::Removed(RemovedReason::LessWorth(tx_id)).to_string(),
246        };
247
248        let removed_transactions = removed_transactions
249            .into_iter()
250            .map(|data| {
251                let removed_tx_id = data.transaction.id();
252
253                (removed_tx_id, status.clone())
254            })
255            .collect::<Vec<_>>();
256        if !removed_transactions.is_empty() {
257            self.tx_status_manager
258                .squeezed_out_txs(removed_transactions);
259        }
260
261        self.update_stats();
262        Ok(())
263    }
264
265    fn update_stats(&self) {
266        let _ = self.pool_stats_sender.send(TxPoolStats {
267            tx_count: self.tx_count() as u64,
268            total_size: self.current_bytes_size as u64,
269            total_gas: self.current_gas,
270        });
271    }
272
273    /// Check if a transaction can be inserted into the pool.
274    pub fn can_insert_transaction(
275        &self,
276        tx: ArcPoolTx,
277        persistent_storage: &impl TxPoolPersistentStorage,
278    ) -> Result<CanStoreTransaction<'_, S>, InsertionErrorType> {
279        if tx.max_gas() == 0 {
280            return Err(InsertionErrorType::Error(Error::InputValidation(
281                InputValidationError::MaxGasZero,
282            )))
283        }
284
285        let tx_id = tx.id();
286        if self.tx_id_to_storage_id.contains_key(&tx_id) {
287            return Err(InsertionErrorType::Error(Error::InputValidation(
288                InputValidationError::DuplicateTxId(tx_id),
289            )))
290        }
291
292        self.config
293            .black_list
294            .check_blacklisting(&tx)
295            .map_err(Error::Blacklisted)?;
296
297        Self::check_blob_does_not_exist(&tx, persistent_storage)?;
298        self.storage.validate_inputs(
299            &tx,
300            persistent_storage,
301            &self.extracted_outputs,
302            &self.spent_inputs,
303            self.config.utxo_validation,
304        )?;
305
306        let collisions = self.collision_manager.find_collisions(&tx)?;
307        let checked_transaction = self.storage.can_store_transaction(tx)?;
308
309        for collision in collisions.keys() {
310            if checked_transaction.all_dependencies().contains(collision) {
311                return Err(InsertionErrorType::Error(Error::Dependency(
312                    DependencyError::NotInsertedCollisionIsDependency,
313                )));
314            }
315        }
316
317        let has_dependencies = !checked_transaction.all_dependencies().is_empty();
318
319        collisions
320            .check_collision_requirements(
321                checked_transaction.tx(),
322                has_dependencies,
323                &self.storage,
324            )
325            .map_err(Error::Collided)?;
326
327        let can_fit_into_pool = self.can_fit_into_pool(&checked_transaction)?;
328
329        let mut transactions_to_remove = vec![];
330        if let SpaceCheckResult::NotEnoughSpace(left) = can_fit_into_pool {
331            transactions_to_remove = self.find_free_space(left, &checked_transaction)?;
332        }
333
334        let can_store_transaction = CanStoreTransaction {
335            checked_transaction,
336            transactions_to_remove,
337            collisions,
338            _guard: &self.storage,
339        };
340
341        Ok(can_store_transaction)
342    }
343
344    fn record_transaction_time_in_txpool(tx: &StorageData) {
345        if let Ok(elapsed) = tx.creation_instant.elapsed() {
346            txpool_metrics()
347                .transaction_time_in_txpool_secs
348                .observe(elapsed.as_secs_f64());
349        } else {
350            tracing::warn!("Failed to calculate transaction time in txpool");
351        }
352    }
353
354    fn record_select_transaction_time(start: Instant) {
355        let elapsed = start.elapsed().as_micros() as f64;
356        txpool_metrics()
357            .select_transactions_time_microseconds
358            .observe(elapsed);
359    }
360
361    fn register_transaction_counts(&self) {
362        if self.config.metrics {
363            let num_transactions = self.tx_count();
364            let executable_txs =
365                self.selection_algorithm.number_of_executable_transactions();
366            txpool_metrics()
367                .number_of_transactions
368                .set(num_transactions as i64);
369            txpool_metrics()
370                .number_of_executable_transactions
371                .set(executable_txs as i64);
372        }
373    }
374
375    /// Extract transactions for a block.
376    /// Returns a list of transactions that were selected for the block
377    /// based on the constraints given in the configuration and the selection algorithm used.
378    pub fn extract_transactions_for_block(
379        &mut self,
380        constraints: Constraints,
381    ) -> Vec<ArcPoolTx> {
382        let metrics = self.config.metrics;
383        let maybe_start = metrics.then(std::time::Instant::now);
384        let best_txs = self
385            .selection_algorithm
386            .gather_best_txs(constraints, &mut self.storage);
387
388        if let Some(start) = maybe_start {
389            Self::record_select_transaction_time(start)
390        };
391
392        if metrics {
393            best_txs.iter().for_each(|storage_data| {
394                Self::record_transaction_time_in_txpool(storage_data)
395            });
396        }
397
398        let txs = best_txs
399            .into_iter()
400            .map(|storage_entry| {
401                self.extracted_outputs
402                    .new_extracted_transaction(&storage_entry.transaction);
403                self.spent_inputs.maybe_spend_inputs(
404                    storage_entry.transaction.id(),
405                    storage_entry.transaction.inputs(),
406                );
407                self.update_components_and_caches_on_removal(iter::once(&storage_entry));
408                storage_entry.transaction
409            })
410            .collect::<Vec<_>>();
411
412        self.update_stats();
413
414        txs
415    }
416
417    pub fn get(&self, tx_id: &TxId) -> Option<&StorageData> {
418        Storage::get(&self.storage, self.tx_id_to_storage_id.get(tx_id)?)
419    }
420
421    pub fn contains(&self, tx_id: &TxId) -> bool {
422        self.tx_id_to_storage_id.contains_key(tx_id)
423    }
424
425    pub fn iter_tx_ids(&self) -> impl Iterator<Item = &TxId> {
426        self.tx_id_to_storage_id.keys()
427    }
428
429    /// Process committed transactions:
430    /// - Remove transaction but keep its dependents and the dependents become executables.
431    /// - Notify about possible new executable transactions.
432    pub fn process_committed_transactions(&mut self, tx_ids: impl Iterator<Item = TxId>) {
433        let mut transactions_to_promote = vec![];
434        for tx_id in tx_ids {
435            self.spent_inputs.spend_inputs_by_tx_id(tx_id);
436            if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
437                let dependents: Vec<S::StorageIndex> =
438                    self.storage.get_direct_dependents(storage_id).collect();
439                let Some(transaction) = self.storage.remove_transaction(storage_id)
440                else {
441                    debug_assert!(false, "Storage data not found for the transaction");
442                    tracing::warn!(
443                        "Storage data not found for the transaction during `remove_transaction`."
444                    );
445                    continue
446                };
447                self.extracted_outputs
448                    .new_extracted_transaction(&transaction.transaction);
449                self.spent_inputs
450                    .spend_inputs(tx_id, transaction.transaction.inputs());
451                self.update_components_and_caches_on_removal(iter::once(&transaction));
452
453                for dependent in dependents {
454                    if !self.storage.has_dependencies(&dependent) {
455                        transactions_to_promote.push(dependent);
456                    }
457                }
458            }
459        }
460
461        let mut new_executable_transaction = false;
462        for promote in transactions_to_promote {
463            let Some(storage_data) = self.storage.get(&promote) else {
464                debug_assert!(
465                    false,
466                    "Dependent storage data not found for the transaction"
467                );
468                tracing::warn!(
469                    "Dependent storage data not found for \
470                            the transaction during `remove_transaction`."
471                );
472                continue
473            };
474
475            self.selection_algorithm
476                .new_executable_transaction(promote, storage_data);
477            new_executable_transaction = true;
478        }
479
480        if new_executable_transaction {
481            self.new_executable_txs_notifier.send_replace(());
482        }
483
484        self.update_stats();
485    }
486
487    /// Check if the pool has enough space to store a transaction.
488    ///
489    /// It returns `true` if:
490    /// - Pool is not full
491    /// - Removing colliding subtree is enough to make space
492    ///
493    /// It returns an error if the pool is full and transactions has dependencies.
494    ///
495    /// If none of the above is not true, return `false`.
496    fn can_fit_into_pool(
497        &self,
498        checked_transaction: &S::CheckedTransaction,
499    ) -> Result<SpaceCheckResult, Error> {
500        let tx = checked_transaction.tx();
501        let tx_gas = tx.max_gas();
502        let bytes_size = tx.metered_bytes_size();
503        let gas_left = self.current_gas.saturating_add(tx_gas);
504        let bytes_left = self.current_bytes_size.saturating_add(bytes_size);
505        let txs_left = self.tx_id_to_storage_id.len().saturating_add(1);
506        if gas_left <= self.config.pool_limits.max_gas
507            && bytes_left <= self.config.pool_limits.max_bytes_size
508            && txs_left <= self.config.pool_limits.max_txs
509        {
510            return Ok(SpaceCheckResult::EnoughSpace);
511        }
512
513        let has_dependencies = !checked_transaction.all_dependencies().is_empty();
514
515        // If the transaction has a dependency and the pool is full, we refuse it
516        if has_dependencies {
517            return Err(Error::NotInsertedLimitHit);
518        }
519
520        let left = NotEnoughSpace {
521            gas_left,
522            bytes_left,
523            txs_left,
524        };
525
526        Ok(SpaceCheckResult::NotEnoughSpace(left))
527    }
528
529    /// Find free space in the pool by marking less profitable transactions for removal.
530    ///
531    /// Return the list of transactions that must be removed from the pool along all of
532    /// their dependent subtree.
533    ///
534    /// Returns an error impossible to find enough space.
535    fn find_free_space(
536        &self,
537        left: NotEnoughSpace,
538        checked_transaction: &S::CheckedTransaction,
539    ) -> Result<Vec<S::StorageIndex>, Error> {
540        let tx = checked_transaction.tx();
541        let NotEnoughSpace {
542            mut gas_left,
543            mut bytes_left,
544            mut txs_left,
545        } = left;
546
547        // Here the transaction has no dependencies which means that it's an executable transaction
548        // and we want to make space for it
549        let new_tx_ratio = Ratio::new(tx.tip(), tx.max_gas());
550
551        // We want to go over executable transactions and remove the less profitable ones.
552        // It is imported to go over executable transactions, not dependent ones, because we
553        // can include the same subtree several times in the calculation if we use dependent txs.
554        let mut sorted_txs = self.selection_algorithm.get_less_worth_txs();
555
556        let mut transactions_to_remove = vec![];
557
558        while gas_left > self.config.pool_limits.max_gas
559            || bytes_left > self.config.pool_limits.max_bytes_size
560            || txs_left > self.config.pool_limits.max_txs
561        {
562            let storage_id = sorted_txs.next().ok_or(Error::NotInsertedLimitHit)?;
563
564            if checked_transaction.all_dependencies().contains(storage_id) {
565                continue
566            }
567
568            debug_assert!(!self.storage.has_dependencies(storage_id));
569
570            let Some(storage_data) = self.storage.get(storage_id) else {
571                debug_assert!(
572                    false,
573                    "Storage data not found for one of the less worth transactions"
574                );
575                tracing::warn!(
576                    "Storage data not found for one of the less \
577                    worth transactions during `find_free_space`."
578                );
579                continue
580            };
581            let ratio = Ratio::new(
582                storage_data.dependents_cumulative_tip,
583                storage_data.dependents_cumulative_gas,
584            );
585
586            if ratio > new_tx_ratio {
587                return Err(Error::NotInsertedLimitHit);
588            }
589
590            // It is not a correct way of deciding how many gas and bytes we will free
591            // by removing this transaction, but the fastest way.
592            // It will return incorrect values if several transactions have the same
593            // dependents. An example:
594            //
595            // E - Executable transaction
596            // D - Dependent transaction
597            //
598            //   E   E
599            //    \ /
600            //     D
601            //    / \
602            //   D   D
603            //
604            // Removing both E frees (E + E + 3D), while this loop assumes 2 * (E + 3D).
605            // But it is okay since the limits for the TxPool are not strict.
606            let gas = storage_data.dependents_cumulative_gas;
607            let bytes = storage_data.dependents_cumulative_bytes_size;
608            let txs = storage_data.number_dependents_in_chain;
609
610            gas_left = gas_left.saturating_sub(gas);
611            bytes_left = bytes_left.saturating_sub(bytes);
612            txs_left = txs_left.saturating_sub(txs);
613
614            transactions_to_remove.push(*storage_id);
615        }
616
617        Ok(transactions_to_remove)
618    }
619
620    /// Remove transaction and its dependents.
621    pub fn remove_transactions_and_dependents<I>(
622        &mut self,
623        tx_ids: I,
624        tx_status: statuses::SqueezedOut,
625    ) where
626        I: IntoIterator<Item = TxId>,
627    {
628        let mut removed_transactions = vec![];
629        for tx_id in tx_ids {
630            if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
631                let removed = self
632                    .storage
633                    .remove_transaction_and_dependents_subtree(storage_id);
634                self.update_components_and_caches_on_removal(removed.iter());
635                removed_transactions.extend(removed.into_iter().map(|data| {
636                    let tx_id = data.transaction.id();
637
638                    (tx_id, tx_status.clone())
639                }));
640            }
641        }
642        if !removed_transactions.is_empty() {
643            self.tx_status_manager
644                .squeezed_out_txs(removed_transactions);
645        }
646        self.update_stats();
647    }
648
649    pub fn remove_skipped_transaction(&mut self, tx_id: TxId, reason: String) {
650        // If pre confirmation comes from the block producer node via p2p,
651        // transaction still may be inside of the TxPool.
652        // In that case first remove it and all its dependants.
653        if self.tx_id_to_storage_id.contains_key(&tx_id) {
654            let tx_status = statuses::SqueezedOut {
655                reason: Error::SkippedTransaction(format!(
656                    "Transaction with id: {tx_id}, was removed because of: {reason}"
657                ))
658                .to_string(),
659            };
660            self.remove_transactions_and_dependents(iter::once(tx_id), tx_status);
661        }
662
663        self.extracted_outputs.new_skipped_transaction(&tx_id);
664        self.spent_inputs.unspend_inputs(tx_id);
665
666        let coin_dependents = self.collision_manager.get_coins_spenders(&tx_id);
667        if !coin_dependents.is_empty() {
668            let tx_status = statuses::SqueezedOut {
669                reason: Error::SkippedTransaction(format!(
670                    "Parent transaction with id: {tx_id}, was removed because of: {reason}"
671                ))
672                .to_string(),
673            };
674
675            for dependent in coin_dependents {
676                let removed = self
677                    .storage
678                    .remove_transaction_and_dependents_subtree(dependent);
679                self.update_components_and_caches_on_removal(removed.iter());
680                // It's not needed to inform the status manager about the skipped transaction herself
681                // because he give the information but we need to inform him about the dependents
682                // that will be deleted
683                let removed_txs: Vec<_> = removed
684                    .into_iter()
685                    .map(|data| {
686                        let tx_id = data.transaction.id();
687                        (tx_id, tx_status.clone())
688                    })
689                    .collect();
690                if !removed_txs.is_empty() {
691                    self.tx_status_manager.squeezed_out_txs(removed_txs);
692                }
693            }
694        }
695
696        self.update_stats();
697    }
698
699    fn check_blob_does_not_exist(
700        tx: &PoolTransaction,
701        persistent_storage: &impl TxPoolPersistentStorage,
702    ) -> Result<(), Error> {
703        if let PoolTransaction::Blob(checked_tx, _) = &tx {
704            let blob_id = checked_tx.transaction().blob_id();
705            if persistent_storage
706                .blob_exist(blob_id)
707                .map_err(|e| Error::Database(format!("{:?}", e)))?
708            {
709                return Err(Error::InputValidation(
710                    InputValidationError::NotInsertedBlobIdAlreadyTaken(*blob_id),
711                ));
712            }
713        }
714        Ok(())
715    }
716
717    fn update_components_and_caches_on_removal<'a>(
718        &mut self,
719        removed_transactions: impl Iterator<Item = &'a StorageData>,
720    ) {
721        for storage_entry in removed_transactions {
722            let tx = &storage_entry.transaction;
723            self.current_gas = self.current_gas.saturating_sub(tx.max_gas());
724            self.current_bytes_size = self
725                .current_bytes_size
726                .saturating_sub(tx.metered_bytes_size());
727            self.tx_id_to_storage_id.remove(&tx.id());
728            self.collision_manager.on_removed_transaction(tx);
729            self.selection_algorithm
730                .on_removed_transaction(storage_entry);
731        }
732        self.register_transaction_counts();
733    }
734
735    #[cfg(test)]
736    pub fn assert_integrity(&self, mut expected_txs: HashSet<TxId>) {
737        for tx in &self.tx_id_to_storage_id {
738            if !expected_txs.remove(tx.0) {
739                panic!(
740                    "Transaction with id {:?} is not in the expected transactions",
741                    tx.0
742                );
743            }
744        }
745        assert!(
746            expected_txs.is_empty(),
747            "Some transactions are not found in the pool"
748        );
749    }
750}
751
752pub struct NotEnoughSpace {
753    gas_left: u64,
754    bytes_left: usize,
755    txs_left: usize,
756}
757
758/// The result of the `can_fit_into_pool` check.
759pub enum SpaceCheckResult {
760    EnoughSpace,
761    NotEnoughSpace(NotEnoughSpace),
762}
763
764pub struct CanStoreTransaction<'a, S>
765where
766    S: Storage,
767{
768    /// The checked transaction by the storage.
769    checked_transaction: S::CheckedTransaction,
770    /// List of transactions to remove to fit the new transaction into the pool.
771    transactions_to_remove: Vec<S::StorageIndex>,
772    /// List of collided transactions that we need to remove to insert transaction.
773    collisions: Collisions<S::StorageIndex>,
774    /// Protects the pool from modifications while this type is active.
775    _guard: &'a S,
776}