fuel_core/service/adapters/
executor.rs

1use crate::{
2    database::RelayerIterableKeyValueView,
3    service::adapters::{
4        NewTxWaiter,
5        TransactionsSource,
6    },
7};
8use fuel_core_executor::{
9    executor::WaitNewTransactionsResult,
10    ports::{
11        MaybeCheckedTransaction,
12        NewTxWaiterPort,
13        PreconfirmationSenderPort,
14    },
15};
16use fuel_core_txpool::Constraints;
17use fuel_core_types::{
18    blockchain::primitives::DaBlockHeight,
19    services::{
20        preconfirmation::Preconfirmation,
21        relayer::Event,
22    },
23};
24use std::{
25    collections::HashSet,
26    sync::Arc,
27};
28use tokio::sync::mpsc::error::TrySendError;
29
30use super::PreconfirmationSender;
31
32impl fuel_core_executor::ports::TransactionsSource for TransactionsSource {
33    fn next(
34        &self,
35        gas_limit: u64,
36        transactions_limit: u16,
37        block_transaction_size_limit: u32,
38    ) -> Vec<MaybeCheckedTransaction> {
39        self.tx_pool
40            .extract_transactions_for_block(Constraints {
41                minimal_gas_price: self.minimum_gas_price,
42                max_gas: gas_limit,
43                maximum_txs: transactions_limit,
44                maximum_block_size: block_transaction_size_limit,
45                excluded_contracts: HashSet::default(),
46            })
47            .unwrap_or_default()
48            .into_iter()
49            .map(|tx| {
50                let transaction = Arc::unwrap_or_clone(tx);
51                let version = transaction.used_consensus_parameters_version();
52                MaybeCheckedTransaction::CheckedTransaction(transaction.into(), version)
53            })
54            .collect()
55    }
56}
57
58impl fuel_core_executor::ports::RelayerPort for RelayerIterableKeyValueView {
59    fn enabled(&self) -> bool {
60        #[cfg(feature = "relayer")]
61        {
62            true
63        }
64        #[cfg(not(feature = "relayer"))]
65        {
66            false
67        }
68    }
69
70    fn get_events(&self, da_height: &DaBlockHeight) -> anyhow::Result<Vec<Event>> {
71        #[cfg(feature = "relayer")]
72        {
73            use fuel_core_storage::StorageAsRef;
74            let events = self
75                .storage::<fuel_core_relayer::storage::EventsHistory>()
76                .get(da_height)?
77                .map(|cow| cow.into_owned())
78                .unwrap_or_default();
79            Ok(events)
80        }
81        #[cfg(not(feature = "relayer"))]
82        {
83            let _ = da_height;
84            Ok(vec![])
85        }
86    }
87}
88
89impl NewTxWaiterPort for NewTxWaiter {
90    async fn wait_for_new_transactions(&mut self) -> WaitNewTransactionsResult {
91        tokio::select! {
92            _ = tokio::time::sleep_until(self.timeout) => {
93                WaitNewTransactionsResult::Timeout
94            }
95            res = self.receiver.changed() => {
96                match res {
97                    Ok(_) => {
98                        WaitNewTransactionsResult::NewTransaction
99                    }
100                    Err(_) => {
101                        WaitNewTransactionsResult::Timeout
102                    }
103                }
104            }
105        }
106    }
107}
108
109impl PreconfirmationSenderPort for PreconfirmationSender {
110    async fn send(&self, preconfirmations: Vec<Preconfirmation>) {
111        // TODO: Avoid cloning of the `preconfirmations`
112        self.tx_status_manager_adapter
113            .tx_status_manager_shared_data
114            .update_preconfirmations(preconfirmations.clone());
115
116        // If the receiver is closed, it means no one is listening to the preconfirmations and so we can drop them.
117        // We don't consider this an error.
118        let _ = self.sender_signature_service.send(preconfirmations).await;
119    }
120
121    fn try_send(&self, preconfirmations: Vec<Preconfirmation>) -> Vec<Preconfirmation> {
122        match self.sender_signature_service.try_reserve() {
123            Ok(permit) => {
124                // TODO: Avoid cloning of the `preconfirmations`
125                self.tx_status_manager_adapter
126                    .tx_status_manager_shared_data
127                    .update_preconfirmations(preconfirmations.clone());
128                permit.send(preconfirmations);
129                vec![]
130            }
131            // If the receiver is closed, it means no one is listening to the preconfirmations and so we can drop them.
132            // We don't consider this an error.
133            Err(TrySendError::Closed(_)) => {
134                self.tx_status_manager_adapter
135                    .tx_status_manager_shared_data
136                    .update_preconfirmations(preconfirmations);
137                vec![]
138            }
139            Err(TrySendError::Full(_)) => preconfirmations,
140        }
141    }
142}