fuel_core/service/adapters/
txpool.rs

1use crate::{
2    database::OnChainIterableKeyValueView,
3    service::adapters::{
4        BlockImporterAdapter,
5        ChainStateInfoProvider,
6        P2PAdapter,
7        PreconfirmationSender,
8        StaticGasPrice,
9    },
10};
11use fuel_core_services::stream::BoxStream;
12use fuel_core_storage::{
13    Result as StorageResult,
14    StorageAsRef,
15    tables::{
16        Coins,
17        ContractsRawCode,
18        Messages,
19        ProcessedTransactions,
20    },
21};
22use fuel_core_txpool::ports::{
23    BlockImporter,
24    ChainStateInfoProvider as ChainStateInfoProviderTrait,
25    GasPriceProvider,
26    TxStatusManager,
27};
28use fuel_core_types::{
29    blockchain::header::ConsensusParametersVersion,
30    entities::{
31        coins::coin::CompressedCoin,
32        relayer::message::Message,
33    },
34    fuel_tx::{
35        BlobId,
36        ConsensusParameters,
37        Transaction,
38        TxId,
39        UtxoId,
40    },
41    fuel_types::{
42        ContractId,
43        Nonce,
44    },
45    fuel_vm::BlobData,
46    services::{
47        block_importer::SharedImportResult,
48        p2p::{
49            GossipsubMessageAcceptance,
50            GossipsubMessageInfo,
51            PeerId,
52            TransactionGossipData,
53        },
54        preconfirmation::{
55            Preconfirmation,
56            PreconfirmationStatus,
57        },
58        transaction_status::{
59            PreConfirmationStatus,
60            TransactionStatus,
61            statuses,
62        },
63    },
64};
65use std::sync::Arc;
66use tokio::sync::broadcast;
67
68impl BlockImporter for BlockImporterAdapter {
69    fn block_events(&self) -> BoxStream<SharedImportResult> {
70        self.events_shared_result()
71    }
72}
73
74#[cfg(feature = "p2p")]
75#[async_trait::async_trait]
76impl fuel_core_txpool::ports::NotifyP2P for P2PAdapter {
77    fn broadcast_transaction(&self, transaction: Arc<Transaction>) -> anyhow::Result<()> {
78        match &self.service {
79            Some(service) => service.broadcast_transaction(transaction),
80            _ => Ok(()),
81        }
82    }
83
84    fn notify_gossip_transaction_validity(
85        &self,
86        message_info: GossipsubMessageInfo,
87        validity: GossipsubMessageAcceptance,
88    ) -> anyhow::Result<()> {
89        match &self.service {
90            Some(service) => {
91                service.notify_gossip_transaction_validity(message_info, validity)
92            }
93            _ => Ok(()),
94        }
95    }
96}
97
98#[cfg(feature = "p2p")]
99impl fuel_core_txpool::ports::P2PSubscriptions for P2PAdapter {
100    type GossipedTransaction = TransactionGossipData;
101
102    fn gossiped_transaction_events(&self) -> BoxStream<Self::GossipedTransaction> {
103        use tokio_stream::{
104            StreamExt,
105            wrappers::BroadcastStream,
106        };
107        match &self.service {
108            Some(service) => Box::pin(
109                BroadcastStream::new(service.subscribe_tx())
110                    .filter_map(|result| result.ok()),
111            ),
112            _ => fuel_core_services::stream::IntoBoxStream::into_boxed(
113                tokio_stream::pending(),
114            ),
115        }
116    }
117
118    fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
119        use tokio_stream::{
120            StreamExt,
121            wrappers::BroadcastStream,
122        };
123        match &self.service {
124            Some(service) => Box::pin(
125                BroadcastStream::new(service.subscribe_new_peers())
126                    .filter_map(|result| result.ok()),
127            ),
128            _ => Box::pin(fuel_core_services::stream::pending()),
129        }
130    }
131}
132
133#[cfg(feature = "p2p")]
134#[async_trait::async_trait]
135impl fuel_core_txpool::ports::P2PRequests for P2PAdapter {
136    async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
137        match &self.service {
138            Some(service) => service.get_all_transactions_ids_from_peer(peer_id).await,
139            _ => Ok(vec![]),
140        }
141    }
142
143    async fn request_txs(
144        &self,
145        peer_id: PeerId,
146        tx_ids: Vec<TxId>,
147    ) -> anyhow::Result<Vec<Option<Transaction>>> {
148        match &self.service {
149            Some(service) => {
150                service
151                    .get_full_transactions_from_peer(peer_id, tx_ids)
152                    .await
153            }
154            _ => Ok(vec![]),
155        }
156    }
157}
158
159#[cfg(not(feature = "p2p"))]
160const _: () = {
161    #[async_trait::async_trait]
162    impl fuel_core_txpool::ports::NotifyP2P for P2PAdapter {
163        fn broadcast_transaction(
164            &self,
165            _transaction: Arc<Transaction>,
166        ) -> anyhow::Result<()> {
167            Ok(())
168        }
169
170        fn notify_gossip_transaction_validity(
171            &self,
172            _message_info: GossipsubMessageInfo,
173            _validity: GossipsubMessageAcceptance,
174        ) -> anyhow::Result<()> {
175            Ok(())
176        }
177    }
178
179    impl fuel_core_txpool::ports::P2PSubscriptions for P2PAdapter {
180        type GossipedTransaction = TransactionGossipData;
181
182        fn gossiped_transaction_events(&self) -> BoxStream<Self::GossipedTransaction> {
183            Box::pin(fuel_core_services::stream::pending())
184        }
185
186        fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
187            Box::pin(fuel_core_services::stream::pending())
188        }
189    }
190
191    #[async_trait::async_trait]
192    impl fuel_core_txpool::ports::P2PRequests for P2PAdapter {
193        async fn request_tx_ids(&self, _peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
194            Ok(vec![])
195        }
196
197        async fn request_txs(
198            &self,
199            _peer_id: PeerId,
200            _tx_ids: Vec<TxId>,
201        ) -> anyhow::Result<Vec<Option<Transaction>>> {
202            Ok(vec![])
203        }
204    }
205};
206
207impl fuel_core_txpool::ports::TxPoolPersistentStorage for OnChainIterableKeyValueView {
208    fn contains_tx(&self, tx_id: &TxId) -> StorageResult<bool> {
209        self.storage::<ProcessedTransactions>().contains_key(tx_id)
210    }
211
212    fn utxo(&self, utxo_id: &UtxoId) -> StorageResult<Option<CompressedCoin>> {
213        self.storage::<Coins>()
214            .get(utxo_id)
215            .map(|t| t.map(|t| t.into_owned()))
216    }
217
218    fn contract_exist(&self, contract_id: &ContractId) -> StorageResult<bool> {
219        self.storage::<ContractsRawCode>().contains_key(contract_id)
220    }
221
222    fn blob_exist(&self, blob_id: &BlobId) -> StorageResult<bool> {
223        self.storage::<BlobData>().contains_key(blob_id)
224    }
225
226    fn message(&self, id: &Nonce) -> StorageResult<Option<Message>> {
227        self.storage::<Messages>()
228            .get(id)
229            .map(|t| t.map(|t| t.into_owned()))
230    }
231}
232
233#[async_trait::async_trait]
234impl GasPriceProvider for StaticGasPrice {
235    fn next_gas_price(&self) -> u64 {
236        self.gas_price
237    }
238}
239
240impl ChainStateInfoProviderTrait for ChainStateInfoProvider {
241    fn latest_consensus_parameters(
242        &self,
243    ) -> (ConsensusParametersVersion, Arc<ConsensusParameters>) {
244        self.shared_state.latest_consensus_parameters_with_version()
245    }
246}
247
248impl TxStatusManager for PreconfirmationSender {
249    fn status_update(&self, tx_id: TxId, tx_status: TransactionStatus) {
250        let permit = self.sender_signature_service.try_reserve();
251
252        if let Ok(permit) = permit
253            && let TransactionStatus::SqueezedOut(status) = &tx_status
254        {
255            let preconfirmation = Preconfirmation {
256                tx_id,
257                status: PreconfirmationStatus::SqueezedOut {
258                    reason: status.reason.clone(),
259                },
260            };
261            permit.send(vec![preconfirmation]);
262        }
263
264        self.tx_status_manager_adapter
265            .update_status(tx_id, tx_status);
266    }
267
268    fn preconfirmations_update_listener(
269        &self,
270    ) -> broadcast::Receiver<(TxId, PreConfirmationStatus)> {
271        self.tx_status_manager_adapter
272            .preconfirmations_update_listener()
273    }
274
275    fn squeezed_out_txs(&self, statuses: Vec<(TxId, statuses::SqueezedOut)>) {
276        let permit = self.sender_signature_service.try_reserve();
277        if let Ok(permit) = permit {
278            let preconfirmations = statuses
279                .iter()
280                .map(|(tx_id, status)| Preconfirmation {
281                    tx_id: *tx_id,
282                    status: PreconfirmationStatus::SqueezedOut {
283                        reason: status.reason.clone(),
284                    },
285                })
286                .collect();
287            permit.send(preconfirmations);
288        }
289        self.tx_status_manager_adapter.update_statuses(statuses);
290    }
291}