fuel_core/service/
adapters.rs

1use std::{
2    ops::Deref,
3    sync::Arc,
4};
5use tokio::sync::{
6    mpsc,
7    watch,
8};
9
10use fuel_core_consensus_module::{
11    RelayerConsensusConfig,
12    block_verifier::Verifier,
13};
14use fuel_core_executor::executor::OnceTransactionsSource;
15use fuel_core_gas_price_service::{
16    common::cumulative_percentage_change,
17    v1::service::LatestGasPrice,
18};
19use fuel_core_importer::ImporterResult;
20use fuel_core_poa::ports::BlockSigner;
21use fuel_core_services::stream::BoxStream;
22use fuel_core_storage::transactional::Changes;
23use fuel_core_tx_status_manager::{
24    SharedData,
25    TxStatusStream,
26};
27use fuel_core_txpool::ports::GasPriceProvider as TxPoolGasPriceProvider;
28use fuel_core_types::{
29    blockchain::{
30        block::Block,
31        consensus::{
32            Consensus,
33            poa::PoAConsensus,
34        },
35    },
36    fuel_tx::{
37        Bytes32,
38        Transaction,
39    },
40    fuel_types::BlockHeight,
41    services::{
42        block_importer::SharedImportResult,
43        block_producer::Components,
44        executor::{
45            Result as ExecutorResult,
46            UncommittedResult,
47        },
48        preconfirmation::Preconfirmation,
49    },
50    signer::SignMode,
51    tai64::Tai64,
52};
53use fuel_core_upgradable_executor::executor::Executor;
54use tokio::time::Instant;
55
56#[cfg(feature = "p2p")]
57use fuel_core_types::services::p2p::peer_reputation::AppScore;
58
59use crate::{
60    database::{
61        Database,
62        database_description::relayer::Relayer,
63    },
64    fuel_core_graphql_api::ports::GasPriceEstimate,
65    service::{
66        sub_services::{
67            BlockProducerService,
68            TxPoolSharedState,
69        },
70        vm_pool::MemoryPool,
71    },
72};
73
74pub mod block_importer;
75pub mod chain_state_info_provider;
76pub mod compression_adapters;
77pub mod consensus_module;
78pub mod executor;
79pub mod fuel_gas_price_provider;
80pub mod gas_price_adapters;
81pub mod graphql_api;
82pub mod import_result_provider;
83#[cfg(feature = "p2p")]
84pub mod p2p;
85pub mod producer;
86pub mod ready_signal;
87#[cfg(feature = "relayer")]
88pub mod relayer;
89#[cfg(feature = "shared-sequencer")]
90pub mod shared_sequencer;
91#[cfg(feature = "p2p")]
92pub mod sync;
93pub mod tx_status_manager;
94pub mod txpool;
95
96#[derive(Debug, Clone)]
97pub struct ChainStateInfoProvider {
98    shared_state: chain_state_info_provider::SharedState,
99}
100
101impl ChainStateInfoProvider {
102    pub fn new(shared_state: chain_state_info_provider::SharedState) -> Self {
103        Self { shared_state }
104    }
105}
106
107#[derive(Debug, Clone)]
108pub struct StaticGasPrice {
109    pub gas_price: u64,
110}
111
112impl StaticGasPrice {
113    pub fn new(gas_price: u64) -> Self {
114        Self { gas_price }
115    }
116}
117
118#[cfg(test)]
119mod universal_gas_price_provider_tests {
120    #![allow(non_snake_case)]
121
122    use proptest::proptest;
123
124    use super::*;
125
126    fn _worst_case__correctly_calculates_value(
127        gas_price: u64,
128        starting_height: u32,
129        block_horizon: u32,
130        percentage: u16,
131    ) {
132        // given
133        let subject =
134            UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
135
136        // when
137        let target_height = starting_height.saturating_add(block_horizon);
138        let estimated = subject.worst_case_gas_price(target_height.into()).unwrap();
139
140        // then
141        let mut actual = gas_price;
142
143        for _ in 0..block_horizon {
144            let change_amount =
145                actual.saturating_mul(percentage as u64).saturating_div(100);
146            actual = actual.saturating_add(change_amount);
147        }
148
149        assert!(estimated >= actual);
150    }
151
152    proptest! {
153        #[test]
154        fn worst_case_gas_price__correctly_calculates_value(
155            gas_price: u64,
156            starting_height: u32,
157            block_horizon in 0..10_000u32,
158            percentage: u16,
159        ) {
160            _worst_case__correctly_calculates_value(
161                gas_price,
162                starting_height,
163                block_horizon,
164                percentage,
165            );
166        }
167    }
168
169    proptest! {
170        #[test]
171        fn worst_case_gas_price__never_overflows(
172            gas_price: u64,
173            starting_height: u32,
174            block_horizon in 0..10_000u32,
175            percentage: u16
176        ) {
177            // given
178            let subject = UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
179
180            // when
181            let target_height = starting_height.saturating_add(block_horizon);
182
183            let _ = subject.worst_case_gas_price(target_height.into());
184
185            // then
186            // doesn't panic with an overflow
187        }
188    }
189
190    fn _next_gas_price__correctly_calculates_value(
191        gas_price: u64,
192        starting_height: u32,
193        percentage: u16,
194    ) {
195        // given
196        let subject =
197            UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
198
199        // when
200        let estimated = subject.next_gas_price();
201
202        // then
203        let change_amount = gas_price
204            .saturating_mul(percentage as u64)
205            .saturating_div(100);
206        let actual = gas_price.saturating_add(change_amount);
207
208        assert!(estimated >= actual);
209    }
210
211    proptest! {
212        #[test]
213        fn next_gas_price__correctly_calculates_value(
214            gas_price: u64,
215            starting_height: u32,
216            percentage: u16,
217        ) {
218            _next_gas_price__correctly_calculates_value(
219                gas_price,
220                starting_height,
221                percentage,
222            )
223        }
224    }
225}
226
227/// Allows communication from other service with more recent gas price data
228/// `Height` refers to the height of the block at which the gas price was last updated
229/// `GasPrice` refers to the gas price at the last updated block
230#[derive(Debug)]
231pub struct UniversalGasPriceProvider<Height, GasPrice> {
232    /// Shared state of latest gas price data
233    latest_gas_price: LatestGasPrice<Height, GasPrice>,
234    /// The max percentage the gas price can increase per block
235    percentage: u16,
236}
237
238impl<Height, GasPrice> Clone for UniversalGasPriceProvider<Height, GasPrice> {
239    fn clone(&self) -> Self {
240        Self {
241            latest_gas_price: self.latest_gas_price.clone(),
242            percentage: self.percentage,
243        }
244    }
245}
246
247impl<Height, GasPrice> UniversalGasPriceProvider<Height, GasPrice> {
248    #[cfg(test)]
249    pub fn new(height: Height, price: GasPrice, percentage: u16) -> Self {
250        let latest_gas_price = LatestGasPrice::new(height, price);
251        Self {
252            latest_gas_price,
253            percentage,
254        }
255    }
256
257    pub fn new_from_inner(
258        inner: LatestGasPrice<Height, GasPrice>,
259        percentage: u16,
260    ) -> Self {
261        Self {
262            latest_gas_price: inner,
263            percentage,
264        }
265    }
266}
267
268impl<Height: Copy, GasPrice: Copy> UniversalGasPriceProvider<Height, GasPrice> {
269    fn get_height_and_gas_price(&self) -> (Height, GasPrice) {
270        self.latest_gas_price.get()
271    }
272}
273
274impl UniversalGasPriceProvider<u32, u64> {
275    pub fn inner_next_gas_price(&self) -> u64 {
276        let (_, latest_price) = self.get_height_and_gas_price();
277        let percentage = self.percentage;
278
279        let change = latest_price
280            .saturating_mul(percentage as u64)
281            .saturating_div(100);
282
283        latest_price.saturating_add(change)
284    }
285}
286
287impl TxPoolGasPriceProvider for UniversalGasPriceProvider<u32, u64> {
288    fn next_gas_price(&self) -> fuel_core_txpool::GasPrice {
289        self.inner_next_gas_price()
290    }
291}
292
293impl GasPriceEstimate for UniversalGasPriceProvider<u32, u64> {
294    fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64> {
295        let (best_height, best_gas_price) = self.get_height_and_gas_price();
296        let percentage = self.percentage;
297
298        let worst = cumulative_percentage_change(
299            best_gas_price,
300            best_height,
301            percentage as u64,
302            height.into(),
303        );
304        Some(worst)
305    }
306}
307
308#[derive(Clone)]
309pub struct PoAAdapter {
310    shared_state: Option<fuel_core_poa::service::SharedState>,
311}
312
313#[derive(Clone)]
314pub struct TxPoolAdapter {
315    service: TxPoolSharedState,
316}
317
318impl TxPoolAdapter {
319    pub fn new(service: TxPoolSharedState) -> Self {
320        Self { service }
321    }
322}
323
324pub struct TransactionsSource {
325    tx_pool: TxPoolSharedState,
326    minimum_gas_price: u64,
327}
328
329impl TransactionsSource {
330    pub fn new(minimum_gas_price: u64, tx_pool: TxPoolSharedState) -> Self {
331        Self {
332            tx_pool,
333            minimum_gas_price,
334        }
335    }
336}
337
338pub struct NewTxWaiter {
339    pub receiver: watch::Receiver<()>,
340    pub timeout: Instant,
341}
342
343impl NewTxWaiter {
344    pub fn new(receiver: watch::Receiver<()>, timeout: Instant) -> Self {
345        Self { receiver, timeout }
346    }
347}
348
349#[derive(Clone)]
350pub struct PreconfirmationSender {
351    pub sender_signature_service: mpsc::Sender<Vec<Preconfirmation>>,
352    pub tx_status_manager_adapter: TxStatusManagerAdapter,
353}
354
355impl PreconfirmationSender {
356    pub fn new(
357        sender_signature_service: mpsc::Sender<Vec<Preconfirmation>>,
358        tx_status_manager_adapter: TxStatusManagerAdapter,
359    ) -> Self {
360        Self {
361            sender_signature_service,
362            tx_status_manager_adapter,
363        }
364    }
365}
366
367#[derive(Clone)]
368pub struct ExecutorAdapter {
369    pub(crate) executor: Arc<Executor<Database, Database<Relayer>>>,
370    pub new_txs_watcher: watch::Receiver<()>,
371    pub preconfirmation_sender: PreconfirmationSender,
372}
373
374impl ExecutorAdapter {
375    pub fn new(
376        database: Database,
377        relayer_database: Database<Relayer>,
378        config: fuel_core_upgradable_executor::config::Config,
379        new_txs_watcher: watch::Receiver<()>,
380        preconfirmation_sender: PreconfirmationSender,
381    ) -> Self {
382        let executor = Executor::new(database, relayer_database, config);
383        Self {
384            executor: Arc::new(executor),
385            new_txs_watcher,
386            preconfirmation_sender,
387        }
388    }
389
390    pub fn produce_without_commit_from_vector(
391        &self,
392        component: Components<Vec<Transaction>>,
393    ) -> ExecutorResult<UncommittedResult<Changes>> {
394        let new_components = Components {
395            header_to_produce: component.header_to_produce,
396            transactions_source: OnceTransactionsSource::new(
397                component.transactions_source,
398            ),
399            gas_price: component.gas_price,
400            coinbase_recipient: component.coinbase_recipient,
401        };
402
403        self.executor
404            .produce_without_commit_with_source_direct_resolve(new_components)
405    }
406}
407
408#[derive(Clone)]
409pub struct VerifierAdapter {
410    pub block_verifier: Arc<Verifier<Database>>,
411}
412
413#[derive(Clone)]
414pub struct ConsensusAdapter {
415    pub block_verifier: Arc<Verifier<Database>>,
416    pub config: RelayerConsensusConfig,
417    pub maybe_relayer: MaybeRelayerAdapter,
418}
419
420impl ConsensusAdapter {
421    pub fn new(
422        block_verifier: VerifierAdapter,
423        config: RelayerConsensusConfig,
424        maybe_relayer: MaybeRelayerAdapter,
425    ) -> Self {
426        Self {
427            block_verifier: block_verifier.block_verifier,
428            config,
429            maybe_relayer,
430        }
431    }
432}
433
434#[derive(Clone)]
435pub struct MaybeRelayerAdapter {
436    #[cfg(feature = "relayer")]
437    pub relayer_synced: Option<fuel_core_relayer::SharedState>,
438    #[cfg(feature = "relayer")]
439    pub relayer_database: Database<Relayer>,
440    #[cfg(feature = "relayer")]
441    pub da_deploy_height: fuel_core_types::blockchain::primitives::DaBlockHeight,
442}
443
444#[derive(Clone)]
445pub struct BlockProducerAdapter {
446    pub block_producer: Arc<BlockProducerService>,
447}
448
449#[derive(Clone)]
450pub struct BlockImporterAdapter {
451    pub block_importer: Arc<fuel_core_importer::Importer>,
452}
453
454impl BlockImporterAdapter {
455    pub fn events(&self) -> BoxStream<ImporterResult> {
456        use futures::StreamExt;
457        fuel_core_services::stream::IntoBoxStream::into_boxed(
458            tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe())
459                .filter_map(|r| futures::future::ready(r.ok())),
460        )
461    }
462
463    pub fn events_shared_result(&self) -> BoxStream<SharedImportResult> {
464        use futures::StreamExt;
465        fuel_core_services::stream::IntoBoxStream::into_boxed(
466            tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe())
467                .filter_map(|r| futures::future::ready(r.ok()))
468                .map(|r| r.shared_result),
469        )
470    }
471}
472
473#[derive(Clone)]
474pub struct TxStatusManagerAdapter {
475    tx_status_manager_shared_data: SharedData,
476}
477
478impl Deref for TxStatusManagerAdapter {
479    type Target = SharedData;
480
481    fn deref(&self) -> &Self::Target {
482        &self.tx_status_manager_shared_data
483    }
484}
485
486impl TxStatusManagerAdapter {
487    pub fn new(tx_status_manager_shared_data: SharedData) -> Self {
488        Self {
489            tx_status_manager_shared_data,
490        }
491    }
492
493    /// Subscribe to status updates for a transaction.
494    pub async fn tx_update_subscribe(
495        &self,
496        tx_id: Bytes32,
497    ) -> anyhow::Result<TxStatusStream> {
498        self.tx_status_manager_shared_data.subscribe(tx_id).await
499    }
500}
501
502#[derive(Clone)]
503pub struct FuelBlockSigner {
504    mode: SignMode,
505}
506impl FuelBlockSigner {
507    pub fn new(mode: SignMode) -> Self {
508        Self { mode }
509    }
510}
511
512#[async_trait::async_trait]
513impl BlockSigner for FuelBlockSigner {
514    async fn seal_block(&self, block: &Block) -> anyhow::Result<Consensus> {
515        let block_hash = block.id();
516        let message = block_hash.into_message();
517        let signature = self.mode.sign_message(message).await?;
518        Ok(Consensus::PoA(PoAConsensus::new(signature)))
519    }
520
521    fn is_available(&self) -> bool {
522        self.mode.is_available()
523    }
524}
525
526#[cfg(feature = "shared-sequencer")]
527#[async_trait::async_trait]
528impl fuel_core_shared_sequencer::ports::Signer for FuelBlockSigner {
529    async fn sign(
530        &self,
531        data: &[u8],
532    ) -> anyhow::Result<fuel_core_types::fuel_crypto::Signature> {
533        Ok(self.mode.sign(data).await?)
534    }
535
536    fn public_key(&self) -> cosmrs::crypto::PublicKey {
537        let pubkey = self
538            .mode
539            .verifying_key()
540            .expect("Invalid public key")
541            .expect("Public key not available");
542        cosmrs::crypto::PublicKey::from(pubkey)
543    }
544
545    fn is_available(&self) -> bool {
546        self.mode.is_available()
547    }
548}
549
550#[cfg(feature = "p2p")]
551#[derive(Clone)]
552pub struct P2PAdapter {
553    service: Option<fuel_core_p2p::service::SharedState>,
554    peer_report_config: PeerReportConfig,
555}
556
557#[cfg(feature = "p2p")]
558#[derive(Clone, Default)]
559pub struct PeerReportConfig {
560    pub successful_block_import: AppScore,
561    pub missing_block_headers: AppScore,
562    pub bad_block_header: AppScore,
563    pub missing_transactions: AppScore,
564    pub invalid_transactions: AppScore,
565}
566
567#[cfg(not(feature = "p2p"))]
568#[derive(Default, Clone)]
569pub struct P2PAdapter;
570
571#[cfg(feature = "p2p")]
572impl P2PAdapter {
573    pub fn new(
574        service: Option<fuel_core_p2p::service::SharedState>,
575        peer_report_config: PeerReportConfig,
576    ) -> Self {
577        Self {
578            service,
579            peer_report_config,
580        }
581    }
582}
583
584#[cfg(not(feature = "p2p"))]
585impl P2PAdapter {
586    pub fn new() -> Self {
587        Default::default()
588    }
589}
590
591#[derive(Clone)]
592pub struct SharedMemoryPool {
593    memory_pool: MemoryPool,
594}
595
596impl SharedMemoryPool {
597    pub fn new(number_of_instances: usize) -> Self {
598        Self {
599            memory_pool: MemoryPool::new(number_of_instances),
600        }
601    }
602}
603
604pub struct SystemTime;
605
606impl fuel_core_poa::ports::GetTime for SystemTime {
607    fn now(&self) -> Tai64 {
608        Tai64::now()
609    }
610}