fuel_core/service/adapters/
executor.rs1use crate::{
2 database::RelayerIterableKeyValueView,
3 service::adapters::{
4 NewTxWaiter,
5 TransactionsSource,
6 },
7};
8use fuel_core_executor::{
9 executor::WaitNewTransactionsResult,
10 ports::{
11 MaybeCheckedTransaction,
12 NewTxWaiterPort,
13 PreconfirmationSenderPort,
14 },
15};
16use fuel_core_txpool::Constraints;
17use fuel_core_types::{
18 blockchain::primitives::DaBlockHeight,
19 services::{
20 preconfirmation::Preconfirmation,
21 relayer::Event,
22 },
23};
24use std::{
25 collections::HashSet,
26 sync::Arc,
27};
28use tokio::sync::mpsc::error::TrySendError;
29
30use super::PreconfirmationSender;
31
32impl fuel_core_executor::ports::TransactionsSource for TransactionsSource {
33 fn next(
34 &self,
35 gas_limit: u64,
36 transactions_limit: u16,
37 block_transaction_size_limit: u32,
38 ) -> Vec<MaybeCheckedTransaction> {
39 self.tx_pool
40 .extract_transactions_for_block(Constraints {
41 minimal_gas_price: self.minimum_gas_price,
42 max_gas: gas_limit,
43 maximum_txs: transactions_limit,
44 maximum_block_size: block_transaction_size_limit,
45 excluded_contracts: HashSet::default(),
46 })
47 .unwrap_or_default()
48 .into_iter()
49 .map(|tx| {
50 let transaction = Arc::unwrap_or_clone(tx);
51 let version = transaction.used_consensus_parameters_version();
52 MaybeCheckedTransaction::CheckedTransaction(transaction.into(), version)
53 })
54 .collect()
55 }
56}
57
58impl fuel_core_executor::ports::RelayerPort for RelayerIterableKeyValueView {
59 fn enabled(&self) -> bool {
60 #[cfg(feature = "relayer")]
61 {
62 true
63 }
64 #[cfg(not(feature = "relayer"))]
65 {
66 false
67 }
68 }
69
70 fn get_events(&self, da_height: &DaBlockHeight) -> anyhow::Result<Vec<Event>> {
71 #[cfg(feature = "relayer")]
72 {
73 use fuel_core_storage::StorageAsRef;
74 let events = self
75 .storage::<fuel_core_relayer::storage::EventsHistory>()
76 .get(da_height)?
77 .map(|cow| cow.into_owned())
78 .unwrap_or_default();
79 Ok(events)
80 }
81 #[cfg(not(feature = "relayer"))]
82 {
83 let _ = da_height;
84 Ok(vec![])
85 }
86 }
87}
88
89impl NewTxWaiterPort for NewTxWaiter {
90 async fn wait_for_new_transactions(&mut self) -> WaitNewTransactionsResult {
91 tokio::select! {
92 _ = tokio::time::sleep_until(self.timeout) => {
93 WaitNewTransactionsResult::Timeout
94 }
95 res = self.receiver.changed() => {
96 match res {
97 Ok(_) => {
98 WaitNewTransactionsResult::NewTransaction
99 }
100 Err(_) => {
101 WaitNewTransactionsResult::Timeout
102 }
103 }
104 }
105 }
106 }
107}
108
109impl PreconfirmationSenderPort for PreconfirmationSender {
110 async fn send(&self, preconfirmations: Vec<Preconfirmation>) {
111 self.tx_status_manager_adapter
113 .tx_status_manager_shared_data
114 .update_preconfirmations(preconfirmations.clone());
115
116 let _ = self.sender_signature_service.send(preconfirmations).await;
119 }
120
121 fn try_send(&self, preconfirmations: Vec<Preconfirmation>) -> Vec<Preconfirmation> {
122 match self.sender_signature_service.try_reserve() {
123 Ok(permit) => {
124 self.tx_status_manager_adapter
126 .tx_status_manager_shared_data
127 .update_preconfirmations(preconfirmations.clone());
128 permit.send(preconfirmations);
129 vec![]
130 }
131 Err(TrySendError::Closed(_)) => {
134 self.tx_status_manager_adapter
135 .tx_status_manager_shared_data
136 .update_preconfirmations(preconfirmations);
137 vec![]
138 }
139 Err(TrySendError::Full(_)) => preconfirmations,
140 }
141 }
142}