fuel_core/service/adapters/
txpool.rs1use crate::{
2 database::OnChainIterableKeyValueView,
3 service::adapters::{
4 BlockImporterAdapter,
5 ChainStateInfoProvider,
6 P2PAdapter,
7 PreconfirmationSender,
8 StaticGasPrice,
9 },
10};
11use fuel_core_services::stream::BoxStream;
12use fuel_core_storage::{
13 Result as StorageResult,
14 StorageAsRef,
15 tables::{
16 Coins,
17 ContractsRawCode,
18 Messages,
19 ProcessedTransactions,
20 },
21};
22use fuel_core_txpool::ports::{
23 BlockImporter,
24 ChainStateInfoProvider as ChainStateInfoProviderTrait,
25 GasPriceProvider,
26 TxStatusManager,
27};
28use fuel_core_types::{
29 blockchain::header::ConsensusParametersVersion,
30 entities::{
31 coins::coin::CompressedCoin,
32 relayer::message::Message,
33 },
34 fuel_tx::{
35 BlobId,
36 ConsensusParameters,
37 Transaction,
38 TxId,
39 UtxoId,
40 },
41 fuel_types::{
42 ContractId,
43 Nonce,
44 },
45 fuel_vm::BlobData,
46 services::{
47 block_importer::SharedImportResult,
48 p2p::{
49 GossipsubMessageAcceptance,
50 GossipsubMessageInfo,
51 PeerId,
52 TransactionGossipData,
53 },
54 preconfirmation::{
55 Preconfirmation,
56 PreconfirmationStatus,
57 },
58 transaction_status::{
59 PreConfirmationStatus,
60 TransactionStatus,
61 statuses,
62 },
63 },
64};
65use std::sync::Arc;
66use tokio::sync::broadcast;
67
68impl BlockImporter for BlockImporterAdapter {
69 fn block_events(&self) -> BoxStream<SharedImportResult> {
70 self.events_shared_result()
71 }
72}
73
74#[cfg(feature = "p2p")]
75#[async_trait::async_trait]
76impl fuel_core_txpool::ports::NotifyP2P for P2PAdapter {
77 fn broadcast_transaction(&self, transaction: Arc<Transaction>) -> anyhow::Result<()> {
78 match &self.service {
79 Some(service) => service.broadcast_transaction(transaction),
80 _ => Ok(()),
81 }
82 }
83
84 fn notify_gossip_transaction_validity(
85 &self,
86 message_info: GossipsubMessageInfo,
87 validity: GossipsubMessageAcceptance,
88 ) -> anyhow::Result<()> {
89 match &self.service {
90 Some(service) => {
91 service.notify_gossip_transaction_validity(message_info, validity)
92 }
93 _ => Ok(()),
94 }
95 }
96}
97
98#[cfg(feature = "p2p")]
99impl fuel_core_txpool::ports::P2PSubscriptions for P2PAdapter {
100 type GossipedTransaction = TransactionGossipData;
101
102 fn gossiped_transaction_events(&self) -> BoxStream<Self::GossipedTransaction> {
103 use tokio_stream::{
104 StreamExt,
105 wrappers::BroadcastStream,
106 };
107 match &self.service {
108 Some(service) => Box::pin(
109 BroadcastStream::new(service.subscribe_tx())
110 .filter_map(|result| result.ok()),
111 ),
112 _ => fuel_core_services::stream::IntoBoxStream::into_boxed(
113 tokio_stream::pending(),
114 ),
115 }
116 }
117
118 fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
119 use tokio_stream::{
120 StreamExt,
121 wrappers::BroadcastStream,
122 };
123 match &self.service {
124 Some(service) => Box::pin(
125 BroadcastStream::new(service.subscribe_new_peers())
126 .filter_map(|result| result.ok()),
127 ),
128 _ => Box::pin(fuel_core_services::stream::pending()),
129 }
130 }
131}
132
133#[cfg(feature = "p2p")]
134#[async_trait::async_trait]
135impl fuel_core_txpool::ports::P2PRequests for P2PAdapter {
136 async fn request_tx_ids(&self, peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
137 match &self.service {
138 Some(service) => service.get_all_transactions_ids_from_peer(peer_id).await,
139 _ => Ok(vec![]),
140 }
141 }
142
143 async fn request_txs(
144 &self,
145 peer_id: PeerId,
146 tx_ids: Vec<TxId>,
147 ) -> anyhow::Result<Vec<Option<Transaction>>> {
148 match &self.service {
149 Some(service) => {
150 service
151 .get_full_transactions_from_peer(peer_id, tx_ids)
152 .await
153 }
154 _ => Ok(vec![]),
155 }
156 }
157}
158
159#[cfg(not(feature = "p2p"))]
160const _: () = {
161 #[async_trait::async_trait]
162 impl fuel_core_txpool::ports::NotifyP2P for P2PAdapter {
163 fn broadcast_transaction(
164 &self,
165 _transaction: Arc<Transaction>,
166 ) -> anyhow::Result<()> {
167 Ok(())
168 }
169
170 fn notify_gossip_transaction_validity(
171 &self,
172 _message_info: GossipsubMessageInfo,
173 _validity: GossipsubMessageAcceptance,
174 ) -> anyhow::Result<()> {
175 Ok(())
176 }
177 }
178
179 impl fuel_core_txpool::ports::P2PSubscriptions for P2PAdapter {
180 type GossipedTransaction = TransactionGossipData;
181
182 fn gossiped_transaction_events(&self) -> BoxStream<Self::GossipedTransaction> {
183 Box::pin(fuel_core_services::stream::pending())
184 }
185
186 fn subscribe_new_peers(&self) -> BoxStream<PeerId> {
187 Box::pin(fuel_core_services::stream::pending())
188 }
189 }
190
191 #[async_trait::async_trait]
192 impl fuel_core_txpool::ports::P2PRequests for P2PAdapter {
193 async fn request_tx_ids(&self, _peer_id: PeerId) -> anyhow::Result<Vec<TxId>> {
194 Ok(vec![])
195 }
196
197 async fn request_txs(
198 &self,
199 _peer_id: PeerId,
200 _tx_ids: Vec<TxId>,
201 ) -> anyhow::Result<Vec<Option<Transaction>>> {
202 Ok(vec![])
203 }
204 }
205};
206
207impl fuel_core_txpool::ports::TxPoolPersistentStorage for OnChainIterableKeyValueView {
208 fn contains_tx(&self, tx_id: &TxId) -> StorageResult<bool> {
209 self.storage::<ProcessedTransactions>().contains_key(tx_id)
210 }
211
212 fn utxo(&self, utxo_id: &UtxoId) -> StorageResult<Option<CompressedCoin>> {
213 self.storage::<Coins>()
214 .get(utxo_id)
215 .map(|t| t.map(|t| t.into_owned()))
216 }
217
218 fn contract_exist(&self, contract_id: &ContractId) -> StorageResult<bool> {
219 self.storage::<ContractsRawCode>().contains_key(contract_id)
220 }
221
222 fn blob_exist(&self, blob_id: &BlobId) -> StorageResult<bool> {
223 self.storage::<BlobData>().contains_key(blob_id)
224 }
225
226 fn message(&self, id: &Nonce) -> StorageResult<Option<Message>> {
227 self.storage::<Messages>()
228 .get(id)
229 .map(|t| t.map(|t| t.into_owned()))
230 }
231}
232
233#[async_trait::async_trait]
234impl GasPriceProvider for StaticGasPrice {
235 fn next_gas_price(&self) -> u64 {
236 self.gas_price
237 }
238}
239
240impl ChainStateInfoProviderTrait for ChainStateInfoProvider {
241 fn latest_consensus_parameters(
242 &self,
243 ) -> (ConsensusParametersVersion, Arc<ConsensusParameters>) {
244 self.shared_state.latest_consensus_parameters_with_version()
245 }
246}
247
248impl TxStatusManager for PreconfirmationSender {
249 fn status_update(&self, tx_id: TxId, tx_status: TransactionStatus) {
250 let permit = self.sender_signature_service.try_reserve();
251
252 if let Ok(permit) = permit
253 && let TransactionStatus::SqueezedOut(status) = &tx_status
254 {
255 let preconfirmation = Preconfirmation {
256 tx_id,
257 status: PreconfirmationStatus::SqueezedOut {
258 reason: status.reason.clone(),
259 },
260 };
261 permit.send(vec![preconfirmation]);
262 }
263
264 self.tx_status_manager_adapter
265 .update_status(tx_id, tx_status);
266 }
267
268 fn preconfirmations_update_listener(
269 &self,
270 ) -> broadcast::Receiver<(TxId, PreConfirmationStatus)> {
271 self.tx_status_manager_adapter
272 .preconfirmations_update_listener()
273 }
274
275 fn squeezed_out_txs(&self, statuses: Vec<(TxId, statuses::SqueezedOut)>) {
276 let permit = self.sender_signature_service.try_reserve();
277 if let Ok(permit) = permit {
278 let preconfirmations = statuses
279 .iter()
280 .map(|(tx_id, status)| Preconfirmation {
281 tx_id: *tx_id,
282 status: PreconfirmationStatus::SqueezedOut {
283 reason: status.reason.clone(),
284 },
285 })
286 .collect();
287 permit.send(preconfirmations);
288 }
289 self.tx_status_manager_adapter.update_statuses(statuses);
290 }
291}