1mod collisions;
2
3use core::num::NonZeroUsize;
4use std::{
5 collections::HashMap,
6 iter,
7 sync::Arc,
8 time::{
9 Instant,
10 SystemTime,
11 },
12};
13
14use collisions::CollisionsExt;
15use fuel_core_metrics::txpool_metrics::txpool_metrics;
16use fuel_core_types::{
17 fuel_tx::{
18 TxId,
19 field::BlobId,
20 },
21 services::{
22 transaction_status::{
23 TransactionStatus,
24 statuses,
25 },
26 txpool::{
27 ArcPoolTx,
28 PoolTransaction,
29 },
30 },
31 tai64::Tai64,
32};
33use num_rational::Ratio;
34
35use crate::{
36 collision_manager::{
37 CollisionManager,
38 Collisions,
39 },
40 config::Config,
41 error::{
42 DependencyError,
43 Error,
44 InputValidationError,
45 InsertionErrorType,
46 },
47 extracted_outputs::ExtractedOutputs,
48 ports::{
49 TxPoolPersistentStorage,
50 TxStatusManager as TxStatusManagerTrait,
51 },
52 selection_algorithms::{
53 Constraints,
54 SelectionAlgorithm,
55 },
56 storage::{
57 CheckedTransaction,
58 Storage,
59 StorageData,
60 },
61};
62
63use crate::{
64 error::RemovedReason,
65 spent_inputs::SpentInputs,
66};
67#[cfg(test)]
68use std::collections::HashSet;
69
70#[derive(Debug, Clone, Copy, Default)]
71pub struct TxPoolStats {
72 pub tx_count: u64,
73 pub total_size: u64,
74 pub total_gas: u64,
75}
76
77pub struct Pool<S, SI, CM, SA, TxStatusManager> {
80 pub(crate) config: Config,
82 pub(crate) storage: S,
84 pub(crate) collision_manager: CM,
86 pub(crate) selection_algorithm: SA,
88 pub(crate) tx_id_to_storage_id: HashMap<TxId, SI>,
90 pub(crate) extracted_outputs: ExtractedOutputs,
92 pub(crate) spent_inputs: SpentInputs,
94 pub(crate) current_gas: u64,
96 pub(crate) current_bytes_size: usize,
98 pub(crate) pool_stats_sender: tokio::sync::watch::Sender<TxPoolStats>,
100 pub(crate) new_executable_txs_notifier: tokio::sync::watch::Sender<()>,
102 pub(crate) tx_status_manager: Arc<TxStatusManager>,
104}
105
106impl<S, SI, CM, SA, TxStatusManager> Pool<S, SI, CM, SA, TxStatusManager> {
107 pub fn new(
109 storage: S,
110 collision_manager: CM,
111 selection_algorithm: SA,
112 config: Config,
113 pool_stats_sender: tokio::sync::watch::Sender<TxPoolStats>,
114 new_executable_txs_notifier: tokio::sync::watch::Sender<()>,
115 tx_status_manager: Arc<TxStatusManager>,
116 ) -> Self {
117 let capacity = NonZeroUsize::new(config.pool_limits.max_txs.saturating_add(1))
118 .expect("Max txs is greater than 0");
119 let spent_inputs = SpentInputs::new(capacity);
120 Pool {
121 storage,
122 collision_manager,
123 selection_algorithm,
124 config,
125 tx_id_to_storage_id: HashMap::new(),
126 extracted_outputs: ExtractedOutputs::new(),
127 spent_inputs,
128 current_gas: 0,
129 current_bytes_size: 0,
130 pool_stats_sender,
131 new_executable_txs_notifier,
132 tx_status_manager,
133 }
134 }
135
136 pub fn tx_count(&self) -> usize {
138 self.tx_id_to_storage_id.len()
139 }
140}
141
142impl<S: Storage, CM, SA, TxStatusManager>
143 Pool<S, S::StorageIndex, CM, SA, TxStatusManager>
144where
145 S: Storage,
146 CM: CollisionManager<StorageIndex = S::StorageIndex>,
147 SA: SelectionAlgorithm<Storage = S, StorageIndex = S::StorageIndex>,
148 TxStatusManager: TxStatusManagerTrait,
149{
150 pub fn insert(
153 &mut self,
154 tx: ArcPoolTx,
155 persistent_storage: &impl TxPoolPersistentStorage,
156 ) -> Result<(), InsertionErrorType> {
157 let tx_id = tx.id();
158 if self.spent_inputs.is_spent_tx(&tx_id)
159 || persistent_storage
160 .contains_tx(&tx_id)
161 .map_err(|e| Error::Database(format!("{:?}", e)))?
162 {
163 return Err(InsertionErrorType::Error(Error::InputValidation(
164 InputValidationError::DuplicateTxId(tx_id),
165 )))
166 }
167
168 let insertion_result = self.insert_inner(tx, persistent_storage);
169 self.register_transaction_counts();
170 insertion_result
171 }
172
173 fn insert_inner(
174 &mut self,
175 tx: Arc<PoolTransaction>,
176 persistent_storage: &impl TxPoolPersistentStorage,
177 ) -> Result<(), InsertionErrorType> {
178 let CanStoreTransaction {
179 checked_transaction,
180 transactions_to_remove,
181 collisions,
182 _guard,
183 } = self.can_insert_transaction(tx, persistent_storage)?;
184
185 let has_dependencies = !checked_transaction.all_dependencies().is_empty();
186
187 let mut removed_transactions = vec![];
188 for tx in transactions_to_remove {
189 let removed = self.storage.remove_transaction_and_dependents_subtree(tx);
190 self.update_components_and_caches_on_removal(removed.iter());
191 removed_transactions.extend(removed);
192 }
193
194 for collided_tx in collisions.keys() {
195 let removed = self
196 .storage
197 .remove_transaction_and_dependents_subtree(*collided_tx);
198 self.update_components_and_caches_on_removal(removed.iter());
199
200 removed_transactions.extend(removed);
201 }
202
203 let tx = checked_transaction.tx();
204 let tx_id = tx.id();
205 let gas = tx.max_gas();
206 let creation_instant = SystemTime::now();
207 let bytes_size = tx.metered_bytes_size();
208
209 let storage_id = self
210 .storage
211 .store_transaction(checked_transaction, creation_instant);
212
213 self.current_gas = self.current_gas.saturating_add(gas);
214 self.current_bytes_size = self.current_bytes_size.saturating_add(bytes_size);
215 debug_assert!(!self.tx_id_to_storage_id.contains_key(&tx_id));
216 self.tx_id_to_storage_id.insert(tx_id, storage_id);
217
218 if self.config.metrics {
219 txpool_metrics().tx_size.observe(bytes_size as f64);
220 };
221
222 let tx =
223 Storage::get(&self.storage, &storage_id).expect("Transaction is set above");
224 self.collision_manager.on_stored_transaction(storage_id, tx);
225
226 let duration = i64::try_from(
227 creation_instant
228 .duration_since(SystemTime::UNIX_EPOCH)
229 .expect("Time can't be less than UNIX EPOCH")
230 .as_secs(),
231 )
232 .expect("Duration is less than i64::MAX");
233 self.tx_status_manager.status_update(
234 tx_id,
235 TransactionStatus::submitted(Tai64::from_unix(duration)),
236 );
237 if !has_dependencies {
239 self.selection_algorithm
240 .new_executable_transaction(storage_id, tx);
241 self.new_executable_txs_notifier.send_replace(());
242 }
243
244 let status = statuses::SqueezedOut {
245 reason: Error::Removed(RemovedReason::LessWorth(tx_id)).to_string(),
246 };
247
248 let removed_transactions = removed_transactions
249 .into_iter()
250 .map(|data| {
251 let removed_tx_id = data.transaction.id();
252
253 (removed_tx_id, status.clone())
254 })
255 .collect::<Vec<_>>();
256 if !removed_transactions.is_empty() {
257 self.tx_status_manager
258 .squeezed_out_txs(removed_transactions);
259 }
260
261 self.update_stats();
262 Ok(())
263 }
264
265 fn update_stats(&self) {
266 let _ = self.pool_stats_sender.send(TxPoolStats {
267 tx_count: self.tx_count() as u64,
268 total_size: self.current_bytes_size as u64,
269 total_gas: self.current_gas,
270 });
271 }
272
273 pub fn can_insert_transaction(
275 &self,
276 tx: ArcPoolTx,
277 persistent_storage: &impl TxPoolPersistentStorage,
278 ) -> Result<CanStoreTransaction<'_, S>, InsertionErrorType> {
279 if tx.max_gas() == 0 {
280 return Err(InsertionErrorType::Error(Error::InputValidation(
281 InputValidationError::MaxGasZero,
282 )))
283 }
284
285 let tx_id = tx.id();
286 if self.tx_id_to_storage_id.contains_key(&tx_id) {
287 return Err(InsertionErrorType::Error(Error::InputValidation(
288 InputValidationError::DuplicateTxId(tx_id),
289 )))
290 }
291
292 self.config
293 .black_list
294 .check_blacklisting(&tx)
295 .map_err(Error::Blacklisted)?;
296
297 Self::check_blob_does_not_exist(&tx, persistent_storage)?;
298 self.storage.validate_inputs(
299 &tx,
300 persistent_storage,
301 &self.extracted_outputs,
302 &self.spent_inputs,
303 self.config.utxo_validation,
304 )?;
305
306 let collisions = self.collision_manager.find_collisions(&tx)?;
307 let checked_transaction = self.storage.can_store_transaction(tx)?;
308
309 for collision in collisions.keys() {
310 if checked_transaction.all_dependencies().contains(collision) {
311 return Err(InsertionErrorType::Error(Error::Dependency(
312 DependencyError::NotInsertedCollisionIsDependency,
313 )));
314 }
315 }
316
317 let has_dependencies = !checked_transaction.all_dependencies().is_empty();
318
319 collisions
320 .check_collision_requirements(
321 checked_transaction.tx(),
322 has_dependencies,
323 &self.storage,
324 )
325 .map_err(Error::Collided)?;
326
327 let can_fit_into_pool = self.can_fit_into_pool(&checked_transaction)?;
328
329 let mut transactions_to_remove = vec![];
330 if let SpaceCheckResult::NotEnoughSpace(left) = can_fit_into_pool {
331 transactions_to_remove = self.find_free_space(left, &checked_transaction)?;
332 }
333
334 let can_store_transaction = CanStoreTransaction {
335 checked_transaction,
336 transactions_to_remove,
337 collisions,
338 _guard: &self.storage,
339 };
340
341 Ok(can_store_transaction)
342 }
343
344 fn record_transaction_time_in_txpool(tx: &StorageData) {
345 if let Ok(elapsed) = tx.creation_instant.elapsed() {
346 txpool_metrics()
347 .transaction_time_in_txpool_secs
348 .observe(elapsed.as_secs_f64());
349 } else {
350 tracing::warn!("Failed to calculate transaction time in txpool");
351 }
352 }
353
354 fn record_select_transaction_time(start: Instant) {
355 let elapsed = start.elapsed().as_micros() as f64;
356 txpool_metrics()
357 .select_transactions_time_microseconds
358 .observe(elapsed);
359 }
360
361 fn register_transaction_counts(&self) {
362 if self.config.metrics {
363 let num_transactions = self.tx_count();
364 let executable_txs =
365 self.selection_algorithm.number_of_executable_transactions();
366 txpool_metrics()
367 .number_of_transactions
368 .set(num_transactions as i64);
369 txpool_metrics()
370 .number_of_executable_transactions
371 .set(executable_txs as i64);
372 }
373 }
374
375 pub fn extract_transactions_for_block(
379 &mut self,
380 constraints: Constraints,
381 ) -> Vec<ArcPoolTx> {
382 let metrics = self.config.metrics;
383 let maybe_start = metrics.then(std::time::Instant::now);
384 let best_txs = self
385 .selection_algorithm
386 .gather_best_txs(constraints, &mut self.storage);
387
388 if let Some(start) = maybe_start {
389 Self::record_select_transaction_time(start)
390 };
391
392 if metrics {
393 best_txs.iter().for_each(|storage_data| {
394 Self::record_transaction_time_in_txpool(storage_data)
395 });
396 }
397
398 let txs = best_txs
399 .into_iter()
400 .map(|storage_entry| {
401 self.extracted_outputs
402 .new_extracted_transaction(&storage_entry.transaction);
403 self.spent_inputs.maybe_spend_inputs(
404 storage_entry.transaction.id(),
405 storage_entry.transaction.inputs(),
406 );
407 self.update_components_and_caches_on_removal(iter::once(&storage_entry));
408 storage_entry.transaction
409 })
410 .collect::<Vec<_>>();
411
412 self.update_stats();
413
414 txs
415 }
416
417 pub fn get(&self, tx_id: &TxId) -> Option<&StorageData> {
418 Storage::get(&self.storage, self.tx_id_to_storage_id.get(tx_id)?)
419 }
420
421 pub fn contains(&self, tx_id: &TxId) -> bool {
422 self.tx_id_to_storage_id.contains_key(tx_id)
423 }
424
425 pub fn iter_tx_ids(&self) -> impl Iterator<Item = &TxId> {
426 self.tx_id_to_storage_id.keys()
427 }
428
429 pub fn process_committed_transactions(&mut self, tx_ids: impl Iterator<Item = TxId>) {
433 let mut transactions_to_promote = vec![];
434 for tx_id in tx_ids {
435 self.spent_inputs.spend_inputs_by_tx_id(tx_id);
436 if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
437 let dependents: Vec<S::StorageIndex> =
438 self.storage.get_direct_dependents(storage_id).collect();
439 let Some(transaction) = self.storage.remove_transaction(storage_id)
440 else {
441 debug_assert!(false, "Storage data not found for the transaction");
442 tracing::warn!(
443 "Storage data not found for the transaction during `remove_transaction`."
444 );
445 continue
446 };
447 self.extracted_outputs
448 .new_extracted_transaction(&transaction.transaction);
449 self.spent_inputs
450 .spend_inputs(tx_id, transaction.transaction.inputs());
451 self.update_components_and_caches_on_removal(iter::once(&transaction));
452
453 for dependent in dependents {
454 if !self.storage.has_dependencies(&dependent) {
455 transactions_to_promote.push(dependent);
456 }
457 }
458 }
459 }
460
461 let mut new_executable_transaction = false;
462 for promote in transactions_to_promote {
463 let Some(storage_data) = self.storage.get(&promote) else {
464 debug_assert!(
465 false,
466 "Dependent storage data not found for the transaction"
467 );
468 tracing::warn!(
469 "Dependent storage data not found for \
470 the transaction during `remove_transaction`."
471 );
472 continue
473 };
474
475 self.selection_algorithm
476 .new_executable_transaction(promote, storage_data);
477 new_executable_transaction = true;
478 }
479
480 if new_executable_transaction {
481 self.new_executable_txs_notifier.send_replace(());
482 }
483
484 self.update_stats();
485 }
486
487 fn can_fit_into_pool(
497 &self,
498 checked_transaction: &S::CheckedTransaction,
499 ) -> Result<SpaceCheckResult, Error> {
500 let tx = checked_transaction.tx();
501 let tx_gas = tx.max_gas();
502 let bytes_size = tx.metered_bytes_size();
503 let gas_left = self.current_gas.saturating_add(tx_gas);
504 let bytes_left = self.current_bytes_size.saturating_add(bytes_size);
505 let txs_left = self.tx_id_to_storage_id.len().saturating_add(1);
506 if gas_left <= self.config.pool_limits.max_gas
507 && bytes_left <= self.config.pool_limits.max_bytes_size
508 && txs_left <= self.config.pool_limits.max_txs
509 {
510 return Ok(SpaceCheckResult::EnoughSpace);
511 }
512
513 let has_dependencies = !checked_transaction.all_dependencies().is_empty();
514
515 if has_dependencies {
517 return Err(Error::NotInsertedLimitHit);
518 }
519
520 let left = NotEnoughSpace {
521 gas_left,
522 bytes_left,
523 txs_left,
524 };
525
526 Ok(SpaceCheckResult::NotEnoughSpace(left))
527 }
528
529 fn find_free_space(
536 &self,
537 left: NotEnoughSpace,
538 checked_transaction: &S::CheckedTransaction,
539 ) -> Result<Vec<S::StorageIndex>, Error> {
540 let tx = checked_transaction.tx();
541 let NotEnoughSpace {
542 mut gas_left,
543 mut bytes_left,
544 mut txs_left,
545 } = left;
546
547 let new_tx_ratio = Ratio::new(tx.tip(), tx.max_gas());
550
551 let mut sorted_txs = self.selection_algorithm.get_less_worth_txs();
555
556 let mut transactions_to_remove = vec![];
557
558 while gas_left > self.config.pool_limits.max_gas
559 || bytes_left > self.config.pool_limits.max_bytes_size
560 || txs_left > self.config.pool_limits.max_txs
561 {
562 let storage_id = sorted_txs.next().ok_or(Error::NotInsertedLimitHit)?;
563
564 if checked_transaction.all_dependencies().contains(storage_id) {
565 continue
566 }
567
568 debug_assert!(!self.storage.has_dependencies(storage_id));
569
570 let Some(storage_data) = self.storage.get(storage_id) else {
571 debug_assert!(
572 false,
573 "Storage data not found for one of the less worth transactions"
574 );
575 tracing::warn!(
576 "Storage data not found for one of the less \
577 worth transactions during `find_free_space`."
578 );
579 continue
580 };
581 let ratio = Ratio::new(
582 storage_data.dependents_cumulative_tip,
583 storage_data.dependents_cumulative_gas,
584 );
585
586 if ratio > new_tx_ratio {
587 return Err(Error::NotInsertedLimitHit);
588 }
589
590 let gas = storage_data.dependents_cumulative_gas;
607 let bytes = storage_data.dependents_cumulative_bytes_size;
608 let txs = storage_data.number_dependents_in_chain;
609
610 gas_left = gas_left.saturating_sub(gas);
611 bytes_left = bytes_left.saturating_sub(bytes);
612 txs_left = txs_left.saturating_sub(txs);
613
614 transactions_to_remove.push(*storage_id);
615 }
616
617 Ok(transactions_to_remove)
618 }
619
620 pub fn remove_transactions_and_dependents<I>(
622 &mut self,
623 tx_ids: I,
624 tx_status: statuses::SqueezedOut,
625 ) where
626 I: IntoIterator<Item = TxId>,
627 {
628 let mut removed_transactions = vec![];
629 for tx_id in tx_ids {
630 if let Some(storage_id) = self.tx_id_to_storage_id.remove(&tx_id) {
631 let removed = self
632 .storage
633 .remove_transaction_and_dependents_subtree(storage_id);
634 self.update_components_and_caches_on_removal(removed.iter());
635 removed_transactions.extend(removed.into_iter().map(|data| {
636 let tx_id = data.transaction.id();
637
638 (tx_id, tx_status.clone())
639 }));
640 }
641 }
642 if !removed_transactions.is_empty() {
643 self.tx_status_manager
644 .squeezed_out_txs(removed_transactions);
645 }
646 self.update_stats();
647 }
648
649 pub fn remove_skipped_transaction(&mut self, tx_id: TxId, reason: String) {
650 if self.tx_id_to_storage_id.contains_key(&tx_id) {
654 let tx_status = statuses::SqueezedOut {
655 reason: Error::SkippedTransaction(format!(
656 "Transaction with id: {tx_id}, was removed because of: {reason}"
657 ))
658 .to_string(),
659 };
660 self.remove_transactions_and_dependents(iter::once(tx_id), tx_status);
661 }
662
663 self.extracted_outputs.new_skipped_transaction(&tx_id);
664 self.spent_inputs.unspend_inputs(tx_id);
665
666 let coin_dependents = self.collision_manager.get_coins_spenders(&tx_id);
667 if !coin_dependents.is_empty() {
668 let tx_status = statuses::SqueezedOut {
669 reason: Error::SkippedTransaction(format!(
670 "Parent transaction with id: {tx_id}, was removed because of: {reason}"
671 ))
672 .to_string(),
673 };
674
675 for dependent in coin_dependents {
676 let removed = self
677 .storage
678 .remove_transaction_and_dependents_subtree(dependent);
679 self.update_components_and_caches_on_removal(removed.iter());
680 let removed_txs: Vec<_> = removed
684 .into_iter()
685 .map(|data| {
686 let tx_id = data.transaction.id();
687 (tx_id, tx_status.clone())
688 })
689 .collect();
690 if !removed_txs.is_empty() {
691 self.tx_status_manager.squeezed_out_txs(removed_txs);
692 }
693 }
694 }
695
696 self.update_stats();
697 }
698
699 fn check_blob_does_not_exist(
700 tx: &PoolTransaction,
701 persistent_storage: &impl TxPoolPersistentStorage,
702 ) -> Result<(), Error> {
703 if let PoolTransaction::Blob(checked_tx, _) = &tx {
704 let blob_id = checked_tx.transaction().blob_id();
705 if persistent_storage
706 .blob_exist(blob_id)
707 .map_err(|e| Error::Database(format!("{:?}", e)))?
708 {
709 return Err(Error::InputValidation(
710 InputValidationError::NotInsertedBlobIdAlreadyTaken(*blob_id),
711 ));
712 }
713 }
714 Ok(())
715 }
716
717 fn update_components_and_caches_on_removal<'a>(
718 &mut self,
719 removed_transactions: impl Iterator<Item = &'a StorageData>,
720 ) {
721 for storage_entry in removed_transactions {
722 let tx = &storage_entry.transaction;
723 self.current_gas = self.current_gas.saturating_sub(tx.max_gas());
724 self.current_bytes_size = self
725 .current_bytes_size
726 .saturating_sub(tx.metered_bytes_size());
727 self.tx_id_to_storage_id.remove(&tx.id());
728 self.collision_manager.on_removed_transaction(tx);
729 self.selection_algorithm
730 .on_removed_transaction(storage_entry);
731 }
732 self.register_transaction_counts();
733 }
734
735 #[cfg(test)]
736 pub fn assert_integrity(&self, mut expected_txs: HashSet<TxId>) {
737 for tx in &self.tx_id_to_storage_id {
738 if !expected_txs.remove(tx.0) {
739 panic!(
740 "Transaction with id {:?} is not in the expected transactions",
741 tx.0
742 );
743 }
744 }
745 assert!(
746 expected_txs.is_empty(),
747 "Some transactions are not found in the pool"
748 );
749 }
750}
751
752pub struct NotEnoughSpace {
753 gas_left: u64,
754 bytes_left: usize,
755 txs_left: usize,
756}
757
758pub enum SpaceCheckResult {
760 EnoughSpace,
761 NotEnoughSpace(NotEnoughSpace),
762}
763
764pub struct CanStoreTransaction<'a, S>
765where
766 S: Storage,
767{
768 checked_transaction: S::CheckedTransaction,
770 transactions_to_remove: Vec<S::StorageIndex>,
772 collisions: Collisions<S::StorageIndex>,
774 _guard: &'a S,
776}