Skip to main content

fuel_core/service/
adapters.rs

1use std::{
2    ops::Deref,
3    sync::Arc,
4};
5use tokio::sync::{
6    mpsc,
7    watch,
8};
9
10use fuel_core_consensus_module::{
11    RelayerConsensusConfig,
12    block_verifier::Verifier,
13};
14use fuel_core_executor::executor::OnceTransactionsSource;
15use fuel_core_gas_price_service::{
16    common::cumulative_percentage_change,
17    v1::service::LatestGasPrice,
18};
19use fuel_core_importer::ImporterResult;
20use fuel_core_poa::ports::BlockSigner;
21use fuel_core_services::stream::BoxStream;
22use fuel_core_storage::transactional::Changes;
23use fuel_core_tx_status_manager::{
24    SharedData,
25    TxStatusStream,
26};
27use fuel_core_txpool::ports::GasPriceProvider as TxPoolGasPriceProvider;
28use fuel_core_types::{
29    blockchain::{
30        block::Block,
31        consensus::{
32            Consensus,
33            poa::PoAConsensus,
34        },
35    },
36    fuel_tx::{
37        Bytes32,
38        Transaction,
39    },
40    fuel_types::BlockHeight,
41    services::{
42        block_importer::SharedImportResult,
43        block_producer::Components,
44        executor::{
45            Result as ExecutorResult,
46            UncommittedResult,
47        },
48        preconfirmation::Preconfirmation,
49    },
50    signer::SignMode,
51    tai64::Tai64,
52};
53use fuel_core_upgradable_executor::executor::Executor;
54use tokio::time::Instant;
55
56#[cfg(feature = "p2p")]
57use fuel_core_types::services::p2p::peer_reputation::AppScore;
58
59use crate::{
60    database::{
61        Database,
62        database_description::relayer::Relayer,
63    },
64    fuel_core_graphql_api::ports::GasPriceEstimate,
65    service::{
66        sub_services::{
67            BlockProducerService,
68            TxPoolSharedState,
69        },
70        vm_pool::MemoryPool,
71    },
72};
73
74pub mod block_importer;
75pub mod chain_state_info_provider;
76pub mod compression_adapters;
77pub mod consensus_module;
78pub mod executor;
79pub mod fuel_gas_price_provider;
80pub mod gas_price_adapters;
81pub mod graphql_api;
82pub mod import_result_provider;
83
84#[cfg(feature = "p2p")]
85pub mod p2p;
86pub mod producer;
87pub mod ready_signal;
88#[cfg(feature = "relayer")]
89pub mod relayer;
90#[cfg(feature = "rpc")]
91pub mod rpc;
92#[cfg(feature = "shared-sequencer")]
93pub mod shared_sequencer;
94#[cfg(feature = "p2p")]
95pub mod sync;
96pub mod tx_status_manager;
97pub mod txpool;
98
99#[derive(Debug, Clone)]
100pub struct ChainStateInfoProvider {
101    shared_state: chain_state_info_provider::SharedState,
102}
103
104impl ChainStateInfoProvider {
105    pub fn new(shared_state: chain_state_info_provider::SharedState) -> Self {
106        Self { shared_state }
107    }
108}
109
110#[derive(Debug, Clone)]
111pub struct StaticGasPrice {
112    pub gas_price: u64,
113}
114
115impl StaticGasPrice {
116    pub fn new(gas_price: u64) -> Self {
117        Self { gas_price }
118    }
119}
120
121#[cfg(test)]
122mod universal_gas_price_provider_tests {
123    #![allow(non_snake_case)]
124
125    use proptest::proptest;
126
127    use super::*;
128
129    fn _worst_case__correctly_calculates_value(
130        gas_price: u64,
131        starting_height: u32,
132        block_horizon: u32,
133        percentage: u16,
134    ) {
135        // given
136        let subject =
137            UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
138
139        // when
140        let target_height = starting_height.saturating_add(block_horizon);
141        let estimated = subject.worst_case_gas_price(target_height.into()).unwrap();
142
143        // then
144        let mut actual = gas_price;
145
146        for _ in 0..block_horizon {
147            let change_amount =
148                actual.saturating_mul(percentage as u64).saturating_div(100);
149            actual = actual.saturating_add(change_amount);
150        }
151
152        assert!(estimated >= actual);
153    }
154
155    proptest! {
156        #[test]
157        fn worst_case_gas_price__correctly_calculates_value(
158            gas_price: u64,
159            starting_height: u32,
160            block_horizon in 0..10_000u32,
161            percentage: u16,
162        ) {
163            _worst_case__correctly_calculates_value(
164                gas_price,
165                starting_height,
166                block_horizon,
167                percentage,
168            );
169        }
170    }
171
172    proptest! {
173        #[test]
174        fn worst_case_gas_price__never_overflows(
175            gas_price: u64,
176            starting_height: u32,
177            block_horizon in 0..10_000u32,
178            percentage: u16
179        ) {
180            // given
181            let subject = UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
182
183            // when
184            let target_height = starting_height.saturating_add(block_horizon);
185
186            let _ = subject.worst_case_gas_price(target_height.into());
187
188            // then
189            // doesn't panic with an overflow
190        }
191    }
192
193    fn _next_gas_price__correctly_calculates_value(
194        gas_price: u64,
195        starting_height: u32,
196        percentage: u16,
197    ) {
198        // given
199        let subject =
200            UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
201
202        // when
203        let estimated = subject.next_gas_price();
204
205        // then
206        let change_amount = gas_price
207            .saturating_mul(percentage as u64)
208            .saturating_div(100);
209        let actual = gas_price.saturating_add(change_amount);
210
211        assert!(estimated >= actual);
212    }
213
214    proptest! {
215        #[test]
216        fn next_gas_price__correctly_calculates_value(
217            gas_price: u64,
218            starting_height: u32,
219            percentage: u16,
220        ) {
221            _next_gas_price__correctly_calculates_value(
222                gas_price,
223                starting_height,
224                percentage,
225            )
226        }
227    }
228}
229
230/// Allows communication from other service with more recent gas price data
231/// `Height` refers to the height of the block at which the gas price was last updated
232/// `GasPrice` refers to the gas price at the last updated block
233#[derive(Debug)]
234pub struct UniversalGasPriceProvider<Height, GasPrice> {
235    /// Shared state of latest gas price data
236    latest_gas_price: LatestGasPrice<Height, GasPrice>,
237    /// The max percentage the gas price can increase per block
238    percentage: u16,
239}
240
241impl<Height, GasPrice> Clone for UniversalGasPriceProvider<Height, GasPrice> {
242    fn clone(&self) -> Self {
243        Self {
244            latest_gas_price: self.latest_gas_price.clone(),
245            percentage: self.percentage,
246        }
247    }
248}
249
250impl<Height, GasPrice> UniversalGasPriceProvider<Height, GasPrice> {
251    #[cfg(test)]
252    pub fn new(height: Height, price: GasPrice, percentage: u16) -> Self {
253        let latest_gas_price = LatestGasPrice::new(height, price);
254        Self {
255            latest_gas_price,
256            percentage,
257        }
258    }
259
260    pub fn new_from_inner(
261        inner: LatestGasPrice<Height, GasPrice>,
262        percentage: u16,
263    ) -> Self {
264        Self {
265            latest_gas_price: inner,
266            percentage,
267        }
268    }
269}
270
271impl<Height: Copy, GasPrice: Copy> UniversalGasPriceProvider<Height, GasPrice> {
272    fn get_height_and_gas_price(&self) -> (Height, GasPrice) {
273        self.latest_gas_price.get()
274    }
275}
276
277impl UniversalGasPriceProvider<u32, u64> {
278    pub fn inner_next_gas_price(&self) -> u64 {
279        let (_, latest_price) = self.get_height_and_gas_price();
280        let percentage = self.percentage;
281
282        let change = latest_price
283            .saturating_mul(percentage as u64)
284            .saturating_div(100);
285
286        latest_price.saturating_add(change)
287    }
288}
289
290impl TxPoolGasPriceProvider for UniversalGasPriceProvider<u32, u64> {
291    fn next_gas_price(&self) -> fuel_core_txpool::GasPrice {
292        self.inner_next_gas_price()
293    }
294}
295
296impl GasPriceEstimate for UniversalGasPriceProvider<u32, u64> {
297    fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64> {
298        let (best_height, best_gas_price) = self.get_height_and_gas_price();
299        let percentage = self.percentage;
300
301        let worst = cumulative_percentage_change(
302            best_gas_price,
303            best_height,
304            percentage as u64,
305            height.into(),
306        );
307        Some(worst)
308    }
309}
310
311#[derive(Clone)]
312pub struct PoAAdapter {
313    shared_state: Option<fuel_core_poa::service::SharedState>,
314}
315
316#[derive(Clone)]
317pub struct TxPoolAdapter {
318    service: TxPoolSharedState,
319}
320
321impl TxPoolAdapter {
322    pub fn new(service: TxPoolSharedState) -> Self {
323        Self { service }
324    }
325}
326
327pub struct TransactionsSource {
328    tx_pool: TxPoolSharedState,
329    minimum_gas_price: u64,
330}
331
332impl TransactionsSource {
333    pub fn new(minimum_gas_price: u64, tx_pool: TxPoolSharedState) -> Self {
334        Self {
335            tx_pool,
336            minimum_gas_price,
337        }
338    }
339}
340
341pub struct NewTxWaiter {
342    pub receiver: watch::Receiver<()>,
343    pub timeout: Instant,
344}
345
346impl NewTxWaiter {
347    pub fn new(receiver: watch::Receiver<()>, timeout: Instant) -> Self {
348        Self { receiver, timeout }
349    }
350}
351
352#[derive(Clone)]
353pub struct PreconfirmationSender {
354    pub sender_signature_service: mpsc::Sender<Vec<Preconfirmation>>,
355    pub tx_status_manager_adapter: TxStatusManagerAdapter,
356}
357
358impl PreconfirmationSender {
359    pub fn new(
360        sender_signature_service: mpsc::Sender<Vec<Preconfirmation>>,
361        tx_status_manager_adapter: TxStatusManagerAdapter,
362    ) -> Self {
363        Self {
364            sender_signature_service,
365            tx_status_manager_adapter,
366        }
367    }
368}
369
370#[derive(Clone)]
371pub struct ExecutorAdapter {
372    pub(crate) executor: Arc<Executor<Database, Database<Relayer>>>,
373    pub new_txs_watcher: watch::Receiver<()>,
374    pub preconfirmation_sender: PreconfirmationSender,
375}
376
377impl ExecutorAdapter {
378    pub fn new(
379        database: Database,
380        relayer_database: Database<Relayer>,
381        config: fuel_core_upgradable_executor::config::Config,
382        new_txs_watcher: watch::Receiver<()>,
383        preconfirmation_sender: PreconfirmationSender,
384    ) -> Self {
385        let executor = Executor::new(database, relayer_database, config);
386        Self {
387            executor: Arc::new(executor),
388            new_txs_watcher,
389            preconfirmation_sender,
390        }
391    }
392
393    pub fn produce_without_commit_from_vector(
394        &self,
395        component: Components<Vec<Transaction>>,
396    ) -> ExecutorResult<UncommittedResult<Changes>> {
397        let new_components = Components {
398            header_to_produce: component.header_to_produce,
399            transactions_source: OnceTransactionsSource::new(
400                component.transactions_source,
401            ),
402            gas_price: component.gas_price,
403            coinbase_recipient: component.coinbase_recipient,
404        };
405
406        self.executor
407            .produce_without_commit_with_source_direct_resolve(new_components)
408    }
409}
410
411#[derive(Clone)]
412pub struct VerifierAdapter {
413    pub block_verifier: Arc<Verifier<Database>>,
414}
415
416#[derive(Clone)]
417pub struct ConsensusAdapter {
418    pub block_verifier: Arc<Verifier<Database>>,
419    pub config: RelayerConsensusConfig,
420    pub maybe_relayer: MaybeRelayerAdapter,
421}
422
423impl ConsensusAdapter {
424    pub fn new(
425        block_verifier: VerifierAdapter,
426        config: RelayerConsensusConfig,
427        maybe_relayer: MaybeRelayerAdapter,
428    ) -> Self {
429        Self {
430            block_verifier: block_verifier.block_verifier,
431            config,
432            maybe_relayer,
433        }
434    }
435}
436
437#[derive(Clone)]
438pub struct MaybeRelayerAdapter {
439    #[cfg(feature = "relayer")]
440    pub relayer_synced: Option<fuel_core_relayer::SharedState>,
441    #[cfg(feature = "relayer")]
442    pub relayer_database: Database<Relayer>,
443    #[cfg(feature = "relayer")]
444    pub da_deploy_height: fuel_core_types::blockchain::primitives::DaBlockHeight,
445}
446
447#[derive(Clone)]
448pub struct BlockProducerAdapter {
449    pub block_producer: Arc<BlockProducerService>,
450}
451
452#[derive(Clone)]
453pub struct BlockImporterAdapter {
454    pub block_importer: Arc<fuel_core_importer::Importer>,
455    pub database: Database,
456}
457
458impl BlockImporterAdapter {
459    pub fn events(&self) -> BoxStream<ImporterResult> {
460        use futures::StreamExt;
461        fuel_core_services::stream::IntoBoxStream::into_boxed(
462            tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe())
463                .filter_map(|r| futures::future::ready(r.ok())),
464        )
465    }
466
467    pub fn events_shared_result(&self) -> BoxStream<SharedImportResult> {
468        use futures::StreamExt;
469        fuel_core_services::stream::IntoBoxStream::into_boxed(
470            tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe())
471                .filter_map(|r| futures::future::ready(r.ok()))
472                .map(|r| r.shared_result),
473        )
474    }
475}
476
477#[derive(Clone)]
478pub struct TxStatusManagerAdapter {
479    tx_status_manager_shared_data: SharedData,
480}
481
482impl Deref for TxStatusManagerAdapter {
483    type Target = SharedData;
484
485    fn deref(&self) -> &Self::Target {
486        &self.tx_status_manager_shared_data
487    }
488}
489
490impl TxStatusManagerAdapter {
491    pub fn new(tx_status_manager_shared_data: SharedData) -> Self {
492        Self {
493            tx_status_manager_shared_data,
494        }
495    }
496
497    /// Subscribe to status updates for a transaction.
498    pub async fn tx_update_subscribe(
499        &self,
500        tx_id: Bytes32,
501    ) -> anyhow::Result<TxStatusStream> {
502        self.tx_status_manager_shared_data.subscribe(tx_id).await
503    }
504}
505
506#[derive(Clone)]
507pub struct FuelBlockSigner {
508    mode: SignMode,
509}
510impl FuelBlockSigner {
511    pub fn new(mode: SignMode) -> Self {
512        Self { mode }
513    }
514}
515
516#[async_trait::async_trait]
517impl BlockSigner for FuelBlockSigner {
518    async fn seal_block(&self, block: &Block) -> anyhow::Result<Consensus> {
519        let block_hash = block.id();
520        let message = block_hash.into_message();
521        let signature = self.mode.sign_message(message).await?;
522        Ok(Consensus::PoA(PoAConsensus::new(signature)))
523    }
524
525    fn is_available(&self) -> bool {
526        self.mode.is_available()
527    }
528}
529
530#[cfg(feature = "shared-sequencer")]
531#[async_trait::async_trait]
532impl fuel_core_shared_sequencer::ports::Signer for FuelBlockSigner {
533    async fn sign(
534        &self,
535        data: &[u8],
536    ) -> anyhow::Result<fuel_core_types::fuel_crypto::Signature> {
537        Ok(self.mode.sign(data).await?)
538    }
539
540    fn public_key(&self) -> cosmrs::crypto::PublicKey {
541        let pubkey = self
542            .mode
543            .verifying_key()
544            .expect("Invalid public key")
545            .expect("Public key not available");
546        cosmrs::crypto::PublicKey::from(pubkey)
547    }
548
549    fn is_available(&self) -> bool {
550        self.mode.is_available()
551    }
552}
553
554#[cfg(feature = "p2p")]
555#[derive(Clone)]
556pub struct P2PAdapter {
557    service: Option<fuel_core_p2p::service::SharedState>,
558    peer_report_config: PeerReportConfig,
559}
560
561#[cfg(feature = "p2p")]
562#[derive(Clone, Default)]
563pub struct PeerReportConfig {
564    pub successful_block_import: AppScore,
565    pub missing_block_headers: AppScore,
566    pub bad_block_header: AppScore,
567    pub missing_transactions: AppScore,
568    pub invalid_transactions: AppScore,
569}
570
571#[cfg(not(feature = "p2p"))]
572#[derive(Default, Clone)]
573pub struct P2PAdapter;
574
575#[cfg(feature = "p2p")]
576impl P2PAdapter {
577    pub fn new(
578        service: Option<fuel_core_p2p::service::SharedState>,
579        peer_report_config: PeerReportConfig,
580    ) -> Self {
581        Self {
582            service,
583            peer_report_config,
584        }
585    }
586}
587
588#[cfg(not(feature = "p2p"))]
589impl P2PAdapter {
590    pub fn new() -> Self {
591        Default::default()
592    }
593}
594
595#[derive(Clone)]
596pub struct SharedMemoryPool {
597    memory_pool: MemoryPool,
598}
599
600impl SharedMemoryPool {
601    pub fn new(number_of_instances: usize) -> Self {
602        Self {
603            memory_pool: MemoryPool::new(number_of_instances),
604        }
605    }
606}
607
608pub struct SystemTime;
609
610impl fuel_core_poa::ports::GetTime for SystemTime {
611    fn now(&self) -> Tai64 {
612        Tai64::now()
613    }
614}