fuel_core_interfaces/
txpool.rs

1use crate::{
2    common::{
3        fuel_asm::Word,
4        fuel_storage::{
5            StorageAsRef,
6            StorageInspect,
7        },
8        fuel_tx::{
9            field::{
10                Inputs,
11                Outputs,
12            },
13            Bytes32,
14            Cacheable,
15            Chargeable,
16            Checked,
17            ConsensusParameters,
18            ContractId,
19            Create,
20            Input,
21            Output,
22            Script,
23            Transaction,
24            TxId,
25            UniqueIdentifier,
26            UtxoId,
27        },
28        fuel_types::MessageId,
29        fuel_vm::storage::ContractsRawCode,
30    },
31    db::{
32        Coins,
33        Error as DbStateError,
34        KvStoreError,
35        Messages,
36    },
37    model::{
38        ArcPoolTx,
39        BlockHeight,
40        BlockId,
41        Coin,
42        Message,
43        TxInfo,
44    },
45};
46use derive_more::{
47    Deref,
48    DerefMut,
49};
50use fuel_vm::prelude::{
51    Interpreter,
52    PredicateStorage,
53    ProgramState,
54};
55use std::{
56    fmt::Debug,
57    sync::Arc,
58};
59use tai64::Tai64;
60use thiserror::Error;
61use tokio::sync::{
62    mpsc,
63    oneshot,
64};
65
66/// The status of the transaction during its life from the tx pool until the block.
67#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
68#[derive(Clone, Debug, PartialEq, Eq)]
69pub enum TransactionStatus {
70    Submitted {
71        time: Tai64,
72    },
73    Success {
74        block_id: BlockId,
75        time: Tai64,
76        result: Option<ProgramState>,
77    },
78    SqueezedOut {
79        reason: String,
80    },
81    Failed {
82        block_id: BlockId,
83        time: Tai64,
84        reason: String,
85        result: Option<ProgramState>,
86    },
87}
88
89/// Transaction used by the transaction pool.
90#[derive(Debug, Eq, PartialEq)]
91pub enum PoolTransaction {
92    Script(Checked<Script>),
93    Create(Checked<Create>),
94}
95
96impl Chargeable for PoolTransaction {
97    fn price(&self) -> Word {
98        match self {
99            PoolTransaction::Script(script) => script.transaction().price(),
100            PoolTransaction::Create(create) => create.transaction().price(),
101        }
102    }
103
104    fn limit(&self) -> Word {
105        match self {
106            PoolTransaction::Script(script) => script.transaction().limit(),
107            PoolTransaction::Create(create) => create.transaction().limit(),
108        }
109    }
110
111    fn metered_bytes_size(&self) -> usize {
112        match self {
113            PoolTransaction::Script(script) => script.transaction().metered_bytes_size(),
114            PoolTransaction::Create(create) => create.transaction().metered_bytes_size(),
115        }
116    }
117}
118
119impl UniqueIdentifier for PoolTransaction {
120    fn id(&self) -> Bytes32 {
121        match self {
122            PoolTransaction::Script(script) => script.transaction().id(),
123            PoolTransaction::Create(create) => create.transaction().id(),
124        }
125    }
126}
127
128impl PoolTransaction {
129    pub fn is_computed(&self) -> bool {
130        match self {
131            PoolTransaction::Script(script) => script.transaction().is_computed(),
132            PoolTransaction::Create(create) => create.transaction().is_computed(),
133        }
134    }
135
136    pub fn inputs(&self) -> &Vec<Input> {
137        match self {
138            PoolTransaction::Script(script) => script.transaction().inputs(),
139            PoolTransaction::Create(create) => create.transaction().inputs(),
140        }
141    }
142
143    pub fn outputs(&self) -> &Vec<Output> {
144        match self {
145            PoolTransaction::Script(script) => script.transaction().outputs(),
146            PoolTransaction::Create(create) => create.transaction().outputs(),
147        }
148    }
149
150    pub fn max_gas(&self) -> Word {
151        match self {
152            PoolTransaction::Script(script) => script.metadata().fee.max_gas(),
153            PoolTransaction::Create(create) => create.metadata().fee.max_gas(),
154        }
155    }
156
157    pub fn check_predicates(&self, params: ConsensusParameters) -> bool {
158        match self {
159            PoolTransaction::Script(script) => {
160                Interpreter::<PredicateStorage>::check_predicates(script.clone(), params)
161            }
162            PoolTransaction::Create(create) => {
163                Interpreter::<PredicateStorage>::check_predicates(create.clone(), params)
164            }
165        }
166    }
167}
168
169impl From<&PoolTransaction> for Transaction {
170    fn from(tx: &PoolTransaction) -> Self {
171        match tx {
172            PoolTransaction::Script(script) => {
173                Transaction::Script(script.transaction().clone())
174            }
175            PoolTransaction::Create(create) => {
176                Transaction::Create(create.transaction().clone())
177            }
178        }
179    }
180}
181
182impl From<Checked<Script>> for PoolTransaction {
183    fn from(checked: Checked<Script>) -> Self {
184        Self::Script(checked)
185    }
186}
187
188impl From<Checked<Create>> for PoolTransaction {
189    fn from(checked: Checked<Create>) -> Self {
190        Self::Create(checked)
191    }
192}
193
194/// The `removed` field contains the list of removed transactions during the insertion
195/// of the `inserted` transaction.
196#[derive(Debug)]
197pub struct InsertionResult {
198    pub inserted: ArcPoolTx,
199    pub removed: Vec<ArcPoolTx>,
200}
201
202pub trait TxPoolDb:
203    StorageInspect<Coins, Error = KvStoreError>
204    + StorageInspect<ContractsRawCode, Error = DbStateError>
205    + StorageInspect<Messages, Error = KvStoreError>
206    + Send
207    + Sync
208{
209    fn utxo(&self, utxo_id: &UtxoId) -> Result<Option<Coin>, KvStoreError> {
210        self.storage::<Coins>()
211            .get(utxo_id)
212            .map(|t| t.map(|t| t.as_ref().clone()))
213    }
214
215    fn contract_exist(&self, contract_id: &ContractId) -> Result<bool, DbStateError> {
216        self.storage::<ContractsRawCode>().contains_key(contract_id)
217    }
218
219    fn message(&self, message_id: &MessageId) -> Result<Option<Message>, KvStoreError> {
220        self.storage::<Messages>()
221            .get(message_id)
222            .map(|t| t.map(|t| t.as_ref().clone()))
223    }
224
225    fn current_block_height(&self) -> Result<BlockHeight, KvStoreError>;
226}
227
228/// RPC client for doing calls to the TxPool through an MPSC channel.
229#[derive(Clone, Deref, DerefMut)]
230pub struct Sender(mpsc::Sender<TxPoolMpsc>);
231
232impl Sender {
233    pub fn new(sender: mpsc::Sender<TxPoolMpsc>) -> Self {
234        Self(sender)
235    }
236
237    pub async fn insert(
238        &self,
239        txs: Vec<Arc<Transaction>>,
240    ) -> anyhow::Result<Vec<anyhow::Result<InsertionResult>>> {
241        let (response, receiver) = oneshot::channel();
242        self.send(TxPoolMpsc::Insert { txs, response }).await?;
243        receiver.await.map_err(Into::into)
244    }
245
246    pub async fn find(&self, ids: Vec<TxId>) -> anyhow::Result<Vec<Option<TxInfo>>> {
247        let (response, receiver) = oneshot::channel();
248        self.send(TxPoolMpsc::Find { ids, response }).await?;
249        receiver.await.map_err(Into::into)
250    }
251
252    pub async fn find_one(&self, id: TxId) -> anyhow::Result<Option<TxInfo>> {
253        let (response, receiver) = oneshot::channel();
254        self.send(TxPoolMpsc::FindOne { id, response }).await?;
255        receiver.await.map_err(Into::into)
256    }
257
258    pub async fn find_dependent(&self, ids: Vec<TxId>) -> anyhow::Result<Vec<ArcPoolTx>> {
259        let (response, receiver) = oneshot::channel();
260        self.send(TxPoolMpsc::FindDependent { ids, response })
261            .await?;
262        receiver.await.map_err(Into::into)
263    }
264
265    pub async fn filter_by_negative(&self, ids: Vec<TxId>) -> anyhow::Result<Vec<TxId>> {
266        let (response, receiver) = oneshot::channel();
267        self.send(TxPoolMpsc::FilterByNegative { ids, response })
268            .await?;
269        receiver.await.map_err(Into::into)
270    }
271
272    pub async fn includable(&self) -> anyhow::Result<Vec<ArcPoolTx>> {
273        let (response, receiver) = oneshot::channel();
274        self.send(TxPoolMpsc::Includable { response }).await?;
275        receiver.await.map_err(Into::into)
276    }
277
278    pub async fn remove(&self, ids: Vec<TxId>) -> anyhow::Result<Vec<ArcPoolTx>> {
279        let (response, receiver) = oneshot::channel();
280        self.send(TxPoolMpsc::Remove { ids, response }).await?;
281        receiver.await.map_err(Into::into)
282    }
283
284    pub fn channel(buffer: usize) -> (Sender, mpsc::Receiver<TxPoolMpsc>) {
285        let (sender, reciever) = mpsc::channel(buffer);
286        (Sender(sender), reciever)
287    }
288}
289
290#[async_trait::async_trait]
291impl super::poa_coordinator::TransactionPool for Sender {
292    async fn pending_number(&self) -> anyhow::Result<usize> {
293        let (response, receiver) = oneshot::channel();
294        self.send(TxPoolMpsc::PendingNumber { response }).await?;
295        receiver.await.map_err(Into::into)
296    }
297
298    async fn total_consumable_gas(&self) -> anyhow::Result<u64> {
299        let (response, receiver) = oneshot::channel();
300        self.send(TxPoolMpsc::ConsumableGas { response }).await?;
301        receiver.await.map_err(Into::into)
302    }
303
304    async fn remove_txs(&mut self, ids: Vec<TxId>) -> anyhow::Result<Vec<ArcPoolTx>> {
305        let (response, receiver) = oneshot::channel();
306        self.send(TxPoolMpsc::Remove { ids, response }).await?;
307        receiver.await.map_err(Into::into)
308    }
309}
310
311/// RPC commands that can be sent to the TxPool through an MPSC channel.
312/// Responses are returned using `response` oneshot channel.
313#[derive(Debug)]
314pub enum TxPoolMpsc {
315    /// The number of pending transactions in the pool.
316    PendingNumber { response: oneshot::Sender<usize> },
317    /// The amount of gas in all includable transactions combined
318    ConsumableGas { response: oneshot::Sender<u64> },
319    /// Return all sorted transactions that are includable in next block.
320    /// This is going to be heavy operation, use it only when needed.
321    Includable {
322        response: oneshot::Sender<Vec<ArcPoolTx>>,
323    },
324    /// import list of transaction into txpool. All needed parents need to be known
325    /// and parent->child order should be enforced in Vec, we will not do that check inside
326    /// txpool and will just drop child and include only parent. Additional restrain is that
327    /// child gas_price needs to be lower then parent gas_price. Transaction can be received
328    /// from p2p **RespondTransactions** or from userland. Because of userland we are returning
329    /// error for every insert for better user experience.
330    Insert {
331        txs: Vec<Arc<Transaction>>,
332        response: oneshot::Sender<Vec<anyhow::Result<InsertionResult>>>,
333    },
334    /// find all tx by their hash
335    Find {
336        ids: Vec<TxId>,
337        response: oneshot::Sender<Vec<Option<TxInfo>>>,
338    },
339    /// find one tx by its hash
340    FindOne {
341        id: TxId,
342        response: oneshot::Sender<Option<TxInfo>>,
343    },
344    /// find all dependent tx and return them with requested dependencies in one list sorted by Price.
345    FindDependent {
346        ids: Vec<TxId>,
347        response: oneshot::Sender<Vec<ArcPoolTx>>,
348    },
349    /// remove transaction from pool needed on user demand. Low priority
350    Remove {
351        ids: Vec<TxId>,
352        response: oneshot::Sender<Vec<ArcPoolTx>>,
353    },
354    /// Iterate over `hashes` and return all hashes that we don't have.
355    /// Needed when we receive list of new hashed from peer with
356    /// **BroadcastTransactionHashes**, so txpool needs to return
357    /// tx that we don't have, and request them from that particular peer.
358    FilterByNegative {
359        ids: Vec<TxId>,
360        response: oneshot::Sender<Vec<TxId>>,
361    },
362    /// stop txpool
363    Stop,
364}
365
366#[derive(Clone, Debug, Eq, PartialEq)]
367pub enum TxStatus {
368    /// Submitted into txpool.
369    Submitted,
370    /// Transaction has either been:
371    /// - successfully executed and included in a block.
372    /// - failed to execute and state changes reverted
373    Completed,
374    /// removed from txpool.
375    SqueezedOut { reason: Error },
376}
377
378#[derive(Clone, Debug, Eq, PartialEq)]
379pub struct TxUpdate {
380    tx_id: Bytes32,
381    squeezed_out: Option<Error>,
382}
383
384impl TxUpdate {
385    pub fn updated(tx_id: Bytes32) -> Self {
386        Self {
387            tx_id,
388            squeezed_out: None,
389        }
390    }
391
392    pub fn squeezed_out(tx_id: Bytes32, reason: Error) -> Self {
393        Self {
394            tx_id,
395            squeezed_out: Some(reason),
396        }
397    }
398
399    pub fn tx_id(&self) -> &Bytes32 {
400        &self.tx_id
401    }
402
403    pub fn was_squeezed_out(&self) -> bool {
404        self.squeezed_out.is_some()
405    }
406
407    pub fn into_squeezed_out_reason(self) -> Option<Error> {
408        self.squeezed_out
409    }
410}
411
412#[derive(Error, Debug, PartialEq, Eq, Clone)]
413#[non_exhaustive]
414pub enum Error {
415    #[error("TxPool required that transaction contains metadata")]
416    NoMetadata,
417    #[error("TxPool doesn't support this type of transaction.")]
418    NotSupportedTransactionType,
419    #[error("Transaction is not inserted. Hash is already known")]
420    NotInsertedTxKnown,
421    #[error("Transaction is not inserted. Pool limit is hit, try to increase gas_price")]
422    NotInsertedLimitHit,
423    #[error("Transaction is not inserted. The gas price is too low.")]
424    NotInsertedGasPriceTooLow,
425    #[error(
426        "Transaction is not inserted. More priced tx {0:#x} already spend this UTXO output: {1:#x}"
427    )]
428    NotInsertedCollision(TxId, UtxoId),
429    #[error(
430        "Transaction is not inserted. More priced tx has created contract with ContractId {0:#x}"
431    )]
432    NotInsertedCollisionContractId(ContractId),
433    #[error(
434        "Transaction is not inserted. A higher priced tx {0:#x} is already spending this messageId: {1:#x}"
435    )]
436    NotInsertedCollisionMessageId(TxId, MessageId),
437    #[error(
438        "Transaction is not inserted. Dependent UTXO output is not existing: {0:#x}"
439    )]
440    NotInsertedOutputNotExisting(UtxoId),
441    #[error("Transaction is not inserted. UTXO input contract is not existing: {0:#x}")]
442    NotInsertedInputContractNotExisting(ContractId),
443    #[error("Transaction is not inserted. ContractId is already taken {0:#x}")]
444    NotInsertedContractIdAlreadyTaken(ContractId),
445    #[error("Transaction is not inserted. UTXO is not existing: {0:#x}")]
446    NotInsertedInputUtxoIdNotExisting(UtxoId),
447    #[error("Transaction is not inserted. UTXO is spent: {0:#x}")]
448    NotInsertedInputUtxoIdSpent(UtxoId),
449    #[error("Transaction is not inserted. Message is spent: {0:#x}")]
450    NotInsertedInputMessageIdSpent(MessageId),
451    #[error("Transaction is not inserted. Message id {0:#x} does not match any received message from the DA layer.")]
452    NotInsertedInputMessageUnknown(MessageId),
453    #[error(
454        "Transaction is not inserted. UTXO requires Contract input {0:#x} that is priced lower"
455    )]
456    NotInsertedContractPricedLower(ContractId),
457    #[error("Transaction is not inserted. Input output mismatch. Coin owner is different from expected input")]
458    NotInsertedIoWrongOwner,
459    #[error("Transaction is not inserted. Input output mismatch. Coin output does not match expected input")]
460    NotInsertedIoWrongAmount,
461    #[error("Transaction is not inserted. Input output mismatch. Coin output asset_id does not match expected inputs")]
462    NotInsertedIoWrongAssetId,
463    #[error("Transaction is not inserted. The computed message id doesn't match the provided message id.")]
464    NotInsertedIoWrongMessageId,
465    #[error(
466        "Transaction is not inserted. Input output mismatch. Expected coin but output is contract"
467    )]
468    NotInsertedIoContractOutput,
469    #[error(
470        "Transaction is not inserted. Input output mismatch. Expected coin but output is message"
471    )]
472    NotInsertedIoMessageInput,
473    #[error("Transaction is not inserted. Maximum depth of dependent transaction chain reached")]
474    NotInsertedMaxDepth,
475    #[error("Transaction exceeds the max gas per block limit. Tx gas: {tx_gas}, block limit {block_limit}")]
476    NotInsertedMaxGasLimit { tx_gas: Word, block_limit: Word },
477    // small todo for now it can pass but in future we should include better messages
478    #[error("Transaction removed.")]
479    Removed,
480    #[error("Transaction squeezed out because {0}")]
481    SqueezedOut(String),
482}