1use crate::{
2 common::{
3 fuel_asm::Word,
4 fuel_storage::{
5 StorageAsRef,
6 StorageInspect,
7 },
8 fuel_tx::{
9 field::{
10 Inputs,
11 Outputs,
12 },
13 Bytes32,
14 Cacheable,
15 Chargeable,
16 Checked,
17 ConsensusParameters,
18 ContractId,
19 Create,
20 Input,
21 Output,
22 Script,
23 Transaction,
24 TxId,
25 UniqueIdentifier,
26 UtxoId,
27 },
28 fuel_types::MessageId,
29 fuel_vm::storage::ContractsRawCode,
30 },
31 db::{
32 Coins,
33 Error as DbStateError,
34 KvStoreError,
35 Messages,
36 },
37 model::{
38 ArcPoolTx,
39 BlockHeight,
40 BlockId,
41 Coin,
42 Message,
43 TxInfo,
44 },
45};
46use derive_more::{
47 Deref,
48 DerefMut,
49};
50use fuel_vm::prelude::{
51 Interpreter,
52 PredicateStorage,
53 ProgramState,
54};
55use std::{
56 fmt::Debug,
57 sync::Arc,
58};
59use tai64::Tai64;
60use thiserror::Error;
61use tokio::sync::{
62 mpsc,
63 oneshot,
64};
65
66#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
68#[derive(Clone, Debug, PartialEq, Eq)]
69pub enum TransactionStatus {
70 Submitted {
71 time: Tai64,
72 },
73 Success {
74 block_id: BlockId,
75 time: Tai64,
76 result: Option<ProgramState>,
77 },
78 SqueezedOut {
79 reason: String,
80 },
81 Failed {
82 block_id: BlockId,
83 time: Tai64,
84 reason: String,
85 result: Option<ProgramState>,
86 },
87}
88
89#[derive(Debug, Eq, PartialEq)]
91pub enum PoolTransaction {
92 Script(Checked<Script>),
93 Create(Checked<Create>),
94}
95
96impl Chargeable for PoolTransaction {
97 fn price(&self) -> Word {
98 match self {
99 PoolTransaction::Script(script) => script.transaction().price(),
100 PoolTransaction::Create(create) => create.transaction().price(),
101 }
102 }
103
104 fn limit(&self) -> Word {
105 match self {
106 PoolTransaction::Script(script) => script.transaction().limit(),
107 PoolTransaction::Create(create) => create.transaction().limit(),
108 }
109 }
110
111 fn metered_bytes_size(&self) -> usize {
112 match self {
113 PoolTransaction::Script(script) => script.transaction().metered_bytes_size(),
114 PoolTransaction::Create(create) => create.transaction().metered_bytes_size(),
115 }
116 }
117}
118
119impl UniqueIdentifier for PoolTransaction {
120 fn id(&self) -> Bytes32 {
121 match self {
122 PoolTransaction::Script(script) => script.transaction().id(),
123 PoolTransaction::Create(create) => create.transaction().id(),
124 }
125 }
126}
127
128impl PoolTransaction {
129 pub fn is_computed(&self) -> bool {
130 match self {
131 PoolTransaction::Script(script) => script.transaction().is_computed(),
132 PoolTransaction::Create(create) => create.transaction().is_computed(),
133 }
134 }
135
136 pub fn inputs(&self) -> &Vec<Input> {
137 match self {
138 PoolTransaction::Script(script) => script.transaction().inputs(),
139 PoolTransaction::Create(create) => create.transaction().inputs(),
140 }
141 }
142
143 pub fn outputs(&self) -> &Vec<Output> {
144 match self {
145 PoolTransaction::Script(script) => script.transaction().outputs(),
146 PoolTransaction::Create(create) => create.transaction().outputs(),
147 }
148 }
149
150 pub fn max_gas(&self) -> Word {
151 match self {
152 PoolTransaction::Script(script) => script.metadata().fee.max_gas(),
153 PoolTransaction::Create(create) => create.metadata().fee.max_gas(),
154 }
155 }
156
157 pub fn check_predicates(&self, params: ConsensusParameters) -> bool {
158 match self {
159 PoolTransaction::Script(script) => {
160 Interpreter::<PredicateStorage>::check_predicates(script.clone(), params)
161 }
162 PoolTransaction::Create(create) => {
163 Interpreter::<PredicateStorage>::check_predicates(create.clone(), params)
164 }
165 }
166 }
167}
168
169impl From<&PoolTransaction> for Transaction {
170 fn from(tx: &PoolTransaction) -> Self {
171 match tx {
172 PoolTransaction::Script(script) => {
173 Transaction::Script(script.transaction().clone())
174 }
175 PoolTransaction::Create(create) => {
176 Transaction::Create(create.transaction().clone())
177 }
178 }
179 }
180}
181
182impl From<Checked<Script>> for PoolTransaction {
183 fn from(checked: Checked<Script>) -> Self {
184 Self::Script(checked)
185 }
186}
187
188impl From<Checked<Create>> for PoolTransaction {
189 fn from(checked: Checked<Create>) -> Self {
190 Self::Create(checked)
191 }
192}
193
194#[derive(Debug)]
197pub struct InsertionResult {
198 pub inserted: ArcPoolTx,
199 pub removed: Vec<ArcPoolTx>,
200}
201
202pub trait TxPoolDb:
203 StorageInspect<Coins, Error = KvStoreError>
204 + StorageInspect<ContractsRawCode, Error = DbStateError>
205 + StorageInspect<Messages, Error = KvStoreError>
206 + Send
207 + Sync
208{
209 fn utxo(&self, utxo_id: &UtxoId) -> Result<Option<Coin>, KvStoreError> {
210 self.storage::<Coins>()
211 .get(utxo_id)
212 .map(|t| t.map(|t| t.as_ref().clone()))
213 }
214
215 fn contract_exist(&self, contract_id: &ContractId) -> Result<bool, DbStateError> {
216 self.storage::<ContractsRawCode>().contains_key(contract_id)
217 }
218
219 fn message(&self, message_id: &MessageId) -> Result<Option<Message>, KvStoreError> {
220 self.storage::<Messages>()
221 .get(message_id)
222 .map(|t| t.map(|t| t.as_ref().clone()))
223 }
224
225 fn current_block_height(&self) -> Result<BlockHeight, KvStoreError>;
226}
227
228#[derive(Clone, Deref, DerefMut)]
230pub struct Sender(mpsc::Sender<TxPoolMpsc>);
231
232impl Sender {
233 pub fn new(sender: mpsc::Sender<TxPoolMpsc>) -> Self {
234 Self(sender)
235 }
236
237 pub async fn insert(
238 &self,
239 txs: Vec<Arc<Transaction>>,
240 ) -> anyhow::Result<Vec<anyhow::Result<InsertionResult>>> {
241 let (response, receiver) = oneshot::channel();
242 self.send(TxPoolMpsc::Insert { txs, response }).await?;
243 receiver.await.map_err(Into::into)
244 }
245
246 pub async fn find(&self, ids: Vec<TxId>) -> anyhow::Result<Vec<Option<TxInfo>>> {
247 let (response, receiver) = oneshot::channel();
248 self.send(TxPoolMpsc::Find { ids, response }).await?;
249 receiver.await.map_err(Into::into)
250 }
251
252 pub async fn find_one(&self, id: TxId) -> anyhow::Result<Option<TxInfo>> {
253 let (response, receiver) = oneshot::channel();
254 self.send(TxPoolMpsc::FindOne { id, response }).await?;
255 receiver.await.map_err(Into::into)
256 }
257
258 pub async fn find_dependent(&self, ids: Vec<TxId>) -> anyhow::Result<Vec<ArcPoolTx>> {
259 let (response, receiver) = oneshot::channel();
260 self.send(TxPoolMpsc::FindDependent { ids, response })
261 .await?;
262 receiver.await.map_err(Into::into)
263 }
264
265 pub async fn filter_by_negative(&self, ids: Vec<TxId>) -> anyhow::Result<Vec<TxId>> {
266 let (response, receiver) = oneshot::channel();
267 self.send(TxPoolMpsc::FilterByNegative { ids, response })
268 .await?;
269 receiver.await.map_err(Into::into)
270 }
271
272 pub async fn includable(&self) -> anyhow::Result<Vec<ArcPoolTx>> {
273 let (response, receiver) = oneshot::channel();
274 self.send(TxPoolMpsc::Includable { response }).await?;
275 receiver.await.map_err(Into::into)
276 }
277
278 pub async fn remove(&self, ids: Vec<TxId>) -> anyhow::Result<Vec<ArcPoolTx>> {
279 let (response, receiver) = oneshot::channel();
280 self.send(TxPoolMpsc::Remove { ids, response }).await?;
281 receiver.await.map_err(Into::into)
282 }
283
284 pub fn channel(buffer: usize) -> (Sender, mpsc::Receiver<TxPoolMpsc>) {
285 let (sender, reciever) = mpsc::channel(buffer);
286 (Sender(sender), reciever)
287 }
288}
289
290#[async_trait::async_trait]
291impl super::poa_coordinator::TransactionPool for Sender {
292 async fn pending_number(&self) -> anyhow::Result<usize> {
293 let (response, receiver) = oneshot::channel();
294 self.send(TxPoolMpsc::PendingNumber { response }).await?;
295 receiver.await.map_err(Into::into)
296 }
297
298 async fn total_consumable_gas(&self) -> anyhow::Result<u64> {
299 let (response, receiver) = oneshot::channel();
300 self.send(TxPoolMpsc::ConsumableGas { response }).await?;
301 receiver.await.map_err(Into::into)
302 }
303
304 async fn remove_txs(&mut self, ids: Vec<TxId>) -> anyhow::Result<Vec<ArcPoolTx>> {
305 let (response, receiver) = oneshot::channel();
306 self.send(TxPoolMpsc::Remove { ids, response }).await?;
307 receiver.await.map_err(Into::into)
308 }
309}
310
311#[derive(Debug)]
314pub enum TxPoolMpsc {
315 PendingNumber { response: oneshot::Sender<usize> },
317 ConsumableGas { response: oneshot::Sender<u64> },
319 Includable {
322 response: oneshot::Sender<Vec<ArcPoolTx>>,
323 },
324 Insert {
331 txs: Vec<Arc<Transaction>>,
332 response: oneshot::Sender<Vec<anyhow::Result<InsertionResult>>>,
333 },
334 Find {
336 ids: Vec<TxId>,
337 response: oneshot::Sender<Vec<Option<TxInfo>>>,
338 },
339 FindOne {
341 id: TxId,
342 response: oneshot::Sender<Option<TxInfo>>,
343 },
344 FindDependent {
346 ids: Vec<TxId>,
347 response: oneshot::Sender<Vec<ArcPoolTx>>,
348 },
349 Remove {
351 ids: Vec<TxId>,
352 response: oneshot::Sender<Vec<ArcPoolTx>>,
353 },
354 FilterByNegative {
359 ids: Vec<TxId>,
360 response: oneshot::Sender<Vec<TxId>>,
361 },
362 Stop,
364}
365
366#[derive(Clone, Debug, Eq, PartialEq)]
367pub enum TxStatus {
368 Submitted,
370 Completed,
374 SqueezedOut { reason: Error },
376}
377
378#[derive(Clone, Debug, Eq, PartialEq)]
379pub struct TxUpdate {
380 tx_id: Bytes32,
381 squeezed_out: Option<Error>,
382}
383
384impl TxUpdate {
385 pub fn updated(tx_id: Bytes32) -> Self {
386 Self {
387 tx_id,
388 squeezed_out: None,
389 }
390 }
391
392 pub fn squeezed_out(tx_id: Bytes32, reason: Error) -> Self {
393 Self {
394 tx_id,
395 squeezed_out: Some(reason),
396 }
397 }
398
399 pub fn tx_id(&self) -> &Bytes32 {
400 &self.tx_id
401 }
402
403 pub fn was_squeezed_out(&self) -> bool {
404 self.squeezed_out.is_some()
405 }
406
407 pub fn into_squeezed_out_reason(self) -> Option<Error> {
408 self.squeezed_out
409 }
410}
411
412#[derive(Error, Debug, PartialEq, Eq, Clone)]
413#[non_exhaustive]
414pub enum Error {
415 #[error("TxPool required that transaction contains metadata")]
416 NoMetadata,
417 #[error("TxPool doesn't support this type of transaction.")]
418 NotSupportedTransactionType,
419 #[error("Transaction is not inserted. Hash is already known")]
420 NotInsertedTxKnown,
421 #[error("Transaction is not inserted. Pool limit is hit, try to increase gas_price")]
422 NotInsertedLimitHit,
423 #[error("Transaction is not inserted. The gas price is too low.")]
424 NotInsertedGasPriceTooLow,
425 #[error(
426 "Transaction is not inserted. More priced tx {0:#x} already spend this UTXO output: {1:#x}"
427 )]
428 NotInsertedCollision(TxId, UtxoId),
429 #[error(
430 "Transaction is not inserted. More priced tx has created contract with ContractId {0:#x}"
431 )]
432 NotInsertedCollisionContractId(ContractId),
433 #[error(
434 "Transaction is not inserted. A higher priced tx {0:#x} is already spending this messageId: {1:#x}"
435 )]
436 NotInsertedCollisionMessageId(TxId, MessageId),
437 #[error(
438 "Transaction is not inserted. Dependent UTXO output is not existing: {0:#x}"
439 )]
440 NotInsertedOutputNotExisting(UtxoId),
441 #[error("Transaction is not inserted. UTXO input contract is not existing: {0:#x}")]
442 NotInsertedInputContractNotExisting(ContractId),
443 #[error("Transaction is not inserted. ContractId is already taken {0:#x}")]
444 NotInsertedContractIdAlreadyTaken(ContractId),
445 #[error("Transaction is not inserted. UTXO is not existing: {0:#x}")]
446 NotInsertedInputUtxoIdNotExisting(UtxoId),
447 #[error("Transaction is not inserted. UTXO is spent: {0:#x}")]
448 NotInsertedInputUtxoIdSpent(UtxoId),
449 #[error("Transaction is not inserted. Message is spent: {0:#x}")]
450 NotInsertedInputMessageIdSpent(MessageId),
451 #[error("Transaction is not inserted. Message id {0:#x} does not match any received message from the DA layer.")]
452 NotInsertedInputMessageUnknown(MessageId),
453 #[error(
454 "Transaction is not inserted. UTXO requires Contract input {0:#x} that is priced lower"
455 )]
456 NotInsertedContractPricedLower(ContractId),
457 #[error("Transaction is not inserted. Input output mismatch. Coin owner is different from expected input")]
458 NotInsertedIoWrongOwner,
459 #[error("Transaction is not inserted. Input output mismatch. Coin output does not match expected input")]
460 NotInsertedIoWrongAmount,
461 #[error("Transaction is not inserted. Input output mismatch. Coin output asset_id does not match expected inputs")]
462 NotInsertedIoWrongAssetId,
463 #[error("Transaction is not inserted. The computed message id doesn't match the provided message id.")]
464 NotInsertedIoWrongMessageId,
465 #[error(
466 "Transaction is not inserted. Input output mismatch. Expected coin but output is contract"
467 )]
468 NotInsertedIoContractOutput,
469 #[error(
470 "Transaction is not inserted. Input output mismatch. Expected coin but output is message"
471 )]
472 NotInsertedIoMessageInput,
473 #[error("Transaction is not inserted. Maximum depth of dependent transaction chain reached")]
474 NotInsertedMaxDepth,
475 #[error("Transaction exceeds the max gas per block limit. Tx gas: {tx_gas}, block limit {block_limit}")]
476 NotInsertedMaxGasLimit { tx_gas: Word, block_limit: Word },
477 #[error("Transaction removed.")]
479 Removed,
480 #[error("Transaction squeezed out because {0}")]
481 SqueezedOut(String),
482}