Skip to main content

fuel_core/service/
adapters.rs

1use std::{
2    ops::Deref,
3    sync::Arc,
4};
5use tokio::sync::{
6    mpsc,
7    watch,
8};
9
10use fuel_core_consensus_module::{
11    RelayerConsensusConfig,
12    block_verifier::Verifier,
13};
14use fuel_core_executor::executor::OnceTransactionsSource;
15use fuel_core_gas_price_service::{
16    common::cumulative_percentage_change,
17    v1::service::LatestGasPrice,
18};
19use fuel_core_importer::ImporterResult;
20use fuel_core_poa::ports::BlockSigner;
21use fuel_core_services::stream::BoxStream;
22use fuel_core_storage::transactional::Changes;
23use fuel_core_tx_status_manager::{
24    SharedData,
25    TxStatusStream,
26};
27use fuel_core_txpool::ports::GasPriceProvider as TxPoolGasPriceProvider;
28use fuel_core_types::{
29    blockchain::{
30        block::Block,
31        consensus::{
32            Consensus,
33            poa::PoAConsensus,
34        },
35    },
36    fuel_tx::{
37        Bytes32,
38        Transaction,
39    },
40    fuel_types::BlockHeight,
41    services::{
42        block_importer::SharedImportResult,
43        block_producer::Components,
44        executor::{
45            Result as ExecutorResult,
46            UncommittedResult,
47        },
48        preconfirmation::Preconfirmation,
49    },
50    signer::SignMode,
51    tai64::Tai64,
52};
53use fuel_core_upgradable_executor::executor::Executor;
54use tokio::time::Instant;
55
56#[cfg(feature = "p2p")]
57use fuel_core_types::services::p2p::peer_reputation::AppScore;
58
59use crate::{
60    database::{
61        Database,
62        database_description::relayer::Relayer,
63    },
64    fuel_core_graphql_api::ports::GasPriceEstimate,
65    service::{
66        sub_services::{
67            BlockProducerService,
68            TxPoolSharedState,
69        },
70        vm_pool::MemoryPool,
71    },
72};
73
74pub mod block_importer;
75pub mod chain_state_info_provider;
76pub mod compression_adapters;
77pub mod consensus_module;
78pub mod executor;
79pub mod fuel_gas_price_provider;
80pub mod gas_price_adapters;
81pub mod graphql_api;
82pub mod import_result_provider;
83#[cfg(feature = "p2p")]
84pub mod p2p;
85pub mod producer;
86pub mod ready_signal;
87#[cfg(feature = "relayer")]
88pub mod relayer;
89#[cfg(feature = "shared-sequencer")]
90pub mod shared_sequencer;
91#[cfg(feature = "p2p")]
92pub mod sync;
93pub mod tx_status_manager;
94pub mod txpool;
95
96#[derive(Debug, Clone)]
97pub struct ChainStateInfoProvider {
98    shared_state: chain_state_info_provider::SharedState,
99}
100
101impl ChainStateInfoProvider {
102    pub fn new(shared_state: chain_state_info_provider::SharedState) -> Self {
103        Self { shared_state }
104    }
105}
106
107#[derive(Debug, Clone)]
108pub struct StaticGasPrice {
109    pub gas_price: u64,
110}
111
112impl StaticGasPrice {
113    pub fn new(gas_price: u64) -> Self {
114        Self { gas_price }
115    }
116}
117
118#[cfg(test)]
119mod universal_gas_price_provider_tests {
120    #![allow(non_snake_case)]
121
122    use proptest::proptest;
123
124    use super::*;
125
126    fn _worst_case__correctly_calculates_value(
127        gas_price: u64,
128        starting_height: u32,
129        block_horizon: u32,
130        percentage: u16,
131    ) {
132        // given
133        let subject =
134            UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
135
136        // when
137        let target_height = starting_height.saturating_add(block_horizon);
138        let estimated = subject.worst_case_gas_price(target_height.into()).unwrap();
139
140        // then
141        let mut actual = gas_price;
142
143        for _ in 0..block_horizon {
144            let change_amount =
145                actual.saturating_mul(percentage as u64).saturating_div(100);
146            actual = actual.saturating_add(change_amount);
147        }
148
149        assert!(estimated >= actual);
150    }
151
152    proptest! {
153        #[test]
154        fn worst_case_gas_price__correctly_calculates_value(
155            gas_price: u64,
156            starting_height: u32,
157            block_horizon in 0..10_000u32,
158            percentage: u16,
159        ) {
160            _worst_case__correctly_calculates_value(
161                gas_price,
162                starting_height,
163                block_horizon,
164                percentage,
165            );
166        }
167    }
168
169    proptest! {
170        #[test]
171        fn worst_case_gas_price__never_overflows(
172            gas_price: u64,
173            starting_height: u32,
174            block_horizon in 0..10_000u32,
175            percentage: u16
176        ) {
177            // given
178            let subject = UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
179
180            // when
181            let target_height = starting_height.saturating_add(block_horizon);
182
183            let _ = subject.worst_case_gas_price(target_height.into());
184
185            // then
186            // doesn't panic with an overflow
187        }
188    }
189
190    fn _next_gas_price__correctly_calculates_value(
191        gas_price: u64,
192        starting_height: u32,
193        percentage: u16,
194    ) {
195        // given
196        let subject =
197            UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
198
199        // when
200        let estimated = subject.next_gas_price();
201
202        // then
203        let change_amount = gas_price
204            .saturating_mul(percentage as u64)
205            .saturating_div(100);
206        let actual = gas_price.saturating_add(change_amount);
207
208        assert!(estimated >= actual);
209    }
210
211    proptest! {
212        #[test]
213        fn next_gas_price__correctly_calculates_value(
214            gas_price: u64,
215            starting_height: u32,
216            percentage: u16,
217        ) {
218            _next_gas_price__correctly_calculates_value(
219                gas_price,
220                starting_height,
221                percentage,
222            )
223        }
224    }
225}
226
227/// Allows communication from other service with more recent gas price data
228/// `Height` refers to the height of the block at which the gas price was last updated
229/// `GasPrice` refers to the gas price at the last updated block
230#[derive(Debug)]
231pub struct UniversalGasPriceProvider<Height, GasPrice> {
232    /// Shared state of latest gas price data
233    latest_gas_price: LatestGasPrice<Height, GasPrice>,
234    /// The max percentage the gas price can increase per block
235    percentage: u16,
236}
237
238impl<Height, GasPrice> Clone for UniversalGasPriceProvider<Height, GasPrice> {
239    fn clone(&self) -> Self {
240        Self {
241            latest_gas_price: self.latest_gas_price.clone(),
242            percentage: self.percentage,
243        }
244    }
245}
246
247impl<Height, GasPrice> UniversalGasPriceProvider<Height, GasPrice> {
248    #[cfg(test)]
249    pub fn new(height: Height, price: GasPrice, percentage: u16) -> Self {
250        let latest_gas_price = LatestGasPrice::new(height, price);
251        Self {
252            latest_gas_price,
253            percentage,
254        }
255    }
256
257    pub fn new_from_inner(
258        inner: LatestGasPrice<Height, GasPrice>,
259        percentage: u16,
260    ) -> Self {
261        Self {
262            latest_gas_price: inner,
263            percentage,
264        }
265    }
266}
267
268impl<Height: Copy, GasPrice: Copy> UniversalGasPriceProvider<Height, GasPrice> {
269    fn get_height_and_gas_price(&self) -> (Height, GasPrice) {
270        self.latest_gas_price.get()
271    }
272}
273
274impl UniversalGasPriceProvider<u32, u64> {
275    pub fn inner_next_gas_price(&self) -> u64 {
276        let (_, latest_price) = self.get_height_and_gas_price();
277        let percentage = self.percentage;
278
279        let change = latest_price
280            .saturating_mul(percentage as u64)
281            .saturating_div(100);
282
283        latest_price.saturating_add(change)
284    }
285}
286
287impl TxPoolGasPriceProvider for UniversalGasPriceProvider<u32, u64> {
288    fn next_gas_price(&self) -> fuel_core_txpool::GasPrice {
289        self.inner_next_gas_price()
290    }
291}
292
293impl GasPriceEstimate for UniversalGasPriceProvider<u32, u64> {
294    fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64> {
295        let (best_height, best_gas_price) = self.get_height_and_gas_price();
296        let percentage = self.percentage;
297
298        let worst = cumulative_percentage_change(
299            best_gas_price,
300            best_height,
301            percentage as u64,
302            height.into(),
303        );
304        Some(worst)
305    }
306}
307
308#[derive(Clone)]
309pub struct PoAAdapter {
310    shared_state: Option<fuel_core_poa::service::SharedState>,
311}
312
313#[derive(Clone)]
314pub struct TxPoolAdapter {
315    service: TxPoolSharedState,
316}
317
318impl TxPoolAdapter {
319    pub fn new(service: TxPoolSharedState) -> Self {
320        Self { service }
321    }
322}
323
324pub struct TransactionsSource {
325    tx_pool: TxPoolSharedState,
326    minimum_gas_price: u64,
327}
328
329impl TransactionsSource {
330    pub fn new(minimum_gas_price: u64, tx_pool: TxPoolSharedState) -> Self {
331        Self {
332            tx_pool,
333            minimum_gas_price,
334        }
335    }
336}
337
338pub struct NewTxWaiter {
339    pub receiver: watch::Receiver<()>,
340    pub timeout: Instant,
341}
342
343impl NewTxWaiter {
344    pub fn new(receiver: watch::Receiver<()>, timeout: Instant) -> Self {
345        Self { receiver, timeout }
346    }
347}
348
349#[derive(Clone)]
350pub struct PreconfirmationSender {
351    pub sender_signature_service: mpsc::Sender<Vec<Preconfirmation>>,
352    pub tx_status_manager_adapter: TxStatusManagerAdapter,
353}
354
355impl PreconfirmationSender {
356    pub fn new(
357        sender_signature_service: mpsc::Sender<Vec<Preconfirmation>>,
358        tx_status_manager_adapter: TxStatusManagerAdapter,
359    ) -> Self {
360        Self {
361            sender_signature_service,
362            tx_status_manager_adapter,
363        }
364    }
365}
366
367#[derive(Clone)]
368pub struct ExecutorAdapter {
369    pub(crate) executor: Arc<Executor<Database, Database<Relayer>>>,
370    pub new_txs_watcher: watch::Receiver<()>,
371    pub preconfirmation_sender: PreconfirmationSender,
372}
373
374impl ExecutorAdapter {
375    pub fn new(
376        database: Database,
377        relayer_database: Database<Relayer>,
378        config: fuel_core_upgradable_executor::config::Config,
379        new_txs_watcher: watch::Receiver<()>,
380        preconfirmation_sender: PreconfirmationSender,
381    ) -> Self {
382        let executor = Executor::new(database, relayer_database, config);
383        Self {
384            executor: Arc::new(executor),
385            new_txs_watcher,
386            preconfirmation_sender,
387        }
388    }
389
390    pub fn produce_without_commit_from_vector(
391        &self,
392        component: Components<Vec<Transaction>>,
393    ) -> ExecutorResult<UncommittedResult<Changes>> {
394        let new_components = Components {
395            header_to_produce: component.header_to_produce,
396            transactions_source: OnceTransactionsSource::new(
397                component.transactions_source,
398            ),
399            gas_price: component.gas_price,
400            coinbase_recipient: component.coinbase_recipient,
401        };
402
403        self.executor
404            .produce_without_commit_with_source_direct_resolve(new_components)
405    }
406}
407
408#[derive(Clone)]
409pub struct VerifierAdapter {
410    pub block_verifier: Arc<Verifier<Database>>,
411}
412
413#[derive(Clone)]
414pub struct ConsensusAdapter {
415    pub block_verifier: Arc<Verifier<Database>>,
416    pub config: RelayerConsensusConfig,
417    pub maybe_relayer: MaybeRelayerAdapter,
418}
419
420impl ConsensusAdapter {
421    pub fn new(
422        block_verifier: VerifierAdapter,
423        config: RelayerConsensusConfig,
424        maybe_relayer: MaybeRelayerAdapter,
425    ) -> Self {
426        Self {
427            block_verifier: block_verifier.block_verifier,
428            config,
429            maybe_relayer,
430        }
431    }
432}
433
434#[derive(Clone)]
435pub struct MaybeRelayerAdapter {
436    #[cfg(feature = "relayer")]
437    pub relayer_synced: Option<fuel_core_relayer::SharedState>,
438    #[cfg(feature = "relayer")]
439    pub relayer_database: Database<Relayer>,
440    #[cfg(feature = "relayer")]
441    pub da_deploy_height: fuel_core_types::blockchain::primitives::DaBlockHeight,
442}
443
444#[derive(Clone)]
445pub struct BlockProducerAdapter {
446    pub block_producer: Arc<BlockProducerService>,
447}
448
449#[derive(Clone)]
450pub struct BlockImporterAdapter {
451    pub block_importer: Arc<fuel_core_importer::Importer>,
452    pub database: Database,
453}
454
455impl BlockImporterAdapter {
456    pub fn events(&self) -> BoxStream<ImporterResult> {
457        use futures::StreamExt;
458        fuel_core_services::stream::IntoBoxStream::into_boxed(
459            tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe())
460                .filter_map(|r| futures::future::ready(r.ok())),
461        )
462    }
463
464    pub fn events_shared_result(&self) -> BoxStream<SharedImportResult> {
465        use futures::StreamExt;
466        fuel_core_services::stream::IntoBoxStream::into_boxed(
467            tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe())
468                .filter_map(|r| futures::future::ready(r.ok()))
469                .map(|r| r.shared_result),
470        )
471    }
472}
473
474#[derive(Clone)]
475pub struct TxStatusManagerAdapter {
476    tx_status_manager_shared_data: SharedData,
477}
478
479impl Deref for TxStatusManagerAdapter {
480    type Target = SharedData;
481
482    fn deref(&self) -> &Self::Target {
483        &self.tx_status_manager_shared_data
484    }
485}
486
487impl TxStatusManagerAdapter {
488    pub fn new(tx_status_manager_shared_data: SharedData) -> Self {
489        Self {
490            tx_status_manager_shared_data,
491        }
492    }
493
494    /// Subscribe to status updates for a transaction.
495    pub async fn tx_update_subscribe(
496        &self,
497        tx_id: Bytes32,
498    ) -> anyhow::Result<TxStatusStream> {
499        self.tx_status_manager_shared_data.subscribe(tx_id).await
500    }
501}
502
503#[derive(Clone)]
504pub struct FuelBlockSigner {
505    mode: SignMode,
506}
507impl FuelBlockSigner {
508    pub fn new(mode: SignMode) -> Self {
509        Self { mode }
510    }
511}
512
513#[async_trait::async_trait]
514impl BlockSigner for FuelBlockSigner {
515    async fn seal_block(&self, block: &Block) -> anyhow::Result<Consensus> {
516        let block_hash = block.id();
517        let message = block_hash.into_message();
518        let signature = self.mode.sign_message(message).await?;
519        Ok(Consensus::PoA(PoAConsensus::new(signature)))
520    }
521
522    fn is_available(&self) -> bool {
523        self.mode.is_available()
524    }
525}
526
527#[cfg(feature = "shared-sequencer")]
528#[async_trait::async_trait]
529impl fuel_core_shared_sequencer::ports::Signer for FuelBlockSigner {
530    async fn sign(
531        &self,
532        data: &[u8],
533    ) -> anyhow::Result<fuel_core_types::fuel_crypto::Signature> {
534        Ok(self.mode.sign(data).await?)
535    }
536
537    fn public_key(&self) -> cosmrs::crypto::PublicKey {
538        let pubkey = self
539            .mode
540            .verifying_key()
541            .expect("Invalid public key")
542            .expect("Public key not available");
543        cosmrs::crypto::PublicKey::from(pubkey)
544    }
545
546    fn is_available(&self) -> bool {
547        self.mode.is_available()
548    }
549}
550
551#[cfg(feature = "p2p")]
552#[derive(Clone)]
553pub struct P2PAdapter {
554    service: Option<fuel_core_p2p::service::SharedState>,
555    peer_report_config: PeerReportConfig,
556}
557
558#[cfg(feature = "p2p")]
559#[derive(Clone, Default)]
560pub struct PeerReportConfig {
561    pub successful_block_import: AppScore,
562    pub missing_block_headers: AppScore,
563    pub bad_block_header: AppScore,
564    pub missing_transactions: AppScore,
565    pub invalid_transactions: AppScore,
566}
567
568#[cfg(not(feature = "p2p"))]
569#[derive(Default, Clone)]
570pub struct P2PAdapter;
571
572#[cfg(feature = "p2p")]
573impl P2PAdapter {
574    pub fn new(
575        service: Option<fuel_core_p2p::service::SharedState>,
576        peer_report_config: PeerReportConfig,
577    ) -> Self {
578        Self {
579            service,
580            peer_report_config,
581        }
582    }
583}
584
585#[cfg(not(feature = "p2p"))]
586impl P2PAdapter {
587    pub fn new() -> Self {
588        Default::default()
589    }
590}
591
592#[derive(Clone)]
593pub struct SharedMemoryPool {
594    memory_pool: MemoryPool,
595}
596
597impl SharedMemoryPool {
598    pub fn new(number_of_instances: usize) -> Self {
599        Self {
600            memory_pool: MemoryPool::new(number_of_instances),
601        }
602    }
603}
604
605pub struct SystemTime;
606
607impl fuel_core_poa::ports::GetTime for SystemTime {
608    fn now(&self) -> Tai64 {
609        Tai64::now()
610    }
611}