1use std::{
2 ops::Deref,
3 sync::Arc,
4};
5use tokio::sync::{
6 mpsc,
7 watch,
8};
9
10use fuel_core_consensus_module::{
11 RelayerConsensusConfig,
12 block_verifier::Verifier,
13};
14use fuel_core_executor::executor::OnceTransactionsSource;
15use fuel_core_gas_price_service::{
16 common::cumulative_percentage_change,
17 v1::service::LatestGasPrice,
18};
19use fuel_core_importer::ImporterResult;
20use fuel_core_poa::ports::BlockSigner;
21use fuel_core_services::stream::BoxStream;
22use fuel_core_storage::transactional::Changes;
23use fuel_core_tx_status_manager::{
24 SharedData,
25 TxStatusStream,
26};
27use fuel_core_txpool::ports::GasPriceProvider as TxPoolGasPriceProvider;
28use fuel_core_types::{
29 blockchain::{
30 block::Block,
31 consensus::{
32 Consensus,
33 poa::PoAConsensus,
34 },
35 },
36 fuel_tx::{
37 Bytes32,
38 Transaction,
39 },
40 fuel_types::BlockHeight,
41 services::{
42 block_importer::SharedImportResult,
43 block_producer::Components,
44 executor::{
45 Result as ExecutorResult,
46 UncommittedResult,
47 },
48 preconfirmation::Preconfirmation,
49 },
50 signer::SignMode,
51 tai64::Tai64,
52};
53use fuel_core_upgradable_executor::executor::Executor;
54use tokio::time::Instant;
55
56#[cfg(feature = "p2p")]
57use fuel_core_types::services::p2p::peer_reputation::AppScore;
58
59use crate::{
60 database::{
61 Database,
62 database_description::relayer::Relayer,
63 },
64 fuel_core_graphql_api::ports::GasPriceEstimate,
65 service::{
66 sub_services::{
67 BlockProducerService,
68 TxPoolSharedState,
69 },
70 vm_pool::MemoryPool,
71 },
72};
73
74pub mod block_importer;
75pub mod chain_state_info_provider;
76pub mod compression_adapters;
77pub mod consensus_module;
78pub mod executor;
79pub mod fuel_gas_price_provider;
80pub mod gas_price_adapters;
81pub mod graphql_api;
82pub mod import_result_provider;
83#[cfg(feature = "p2p")]
84pub mod p2p;
85pub mod producer;
86pub mod ready_signal;
87#[cfg(feature = "relayer")]
88pub mod relayer;
89#[cfg(feature = "shared-sequencer")]
90pub mod shared_sequencer;
91#[cfg(feature = "p2p")]
92pub mod sync;
93pub mod tx_status_manager;
94pub mod txpool;
95
96#[derive(Debug, Clone)]
97pub struct ChainStateInfoProvider {
98 shared_state: chain_state_info_provider::SharedState,
99}
100
101impl ChainStateInfoProvider {
102 pub fn new(shared_state: chain_state_info_provider::SharedState) -> Self {
103 Self { shared_state }
104 }
105}
106
107#[derive(Debug, Clone)]
108pub struct StaticGasPrice {
109 pub gas_price: u64,
110}
111
112impl StaticGasPrice {
113 pub fn new(gas_price: u64) -> Self {
114 Self { gas_price }
115 }
116}
117
118#[cfg(test)]
119mod universal_gas_price_provider_tests {
120 #![allow(non_snake_case)]
121
122 use proptest::proptest;
123
124 use super::*;
125
126 fn _worst_case__correctly_calculates_value(
127 gas_price: u64,
128 starting_height: u32,
129 block_horizon: u32,
130 percentage: u16,
131 ) {
132 let subject =
134 UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
135
136 let target_height = starting_height.saturating_add(block_horizon);
138 let estimated = subject.worst_case_gas_price(target_height.into()).unwrap();
139
140 let mut actual = gas_price;
142
143 for _ in 0..block_horizon {
144 let change_amount =
145 actual.saturating_mul(percentage as u64).saturating_div(100);
146 actual = actual.saturating_add(change_amount);
147 }
148
149 assert!(estimated >= actual);
150 }
151
152 proptest! {
153 #[test]
154 fn worst_case_gas_price__correctly_calculates_value(
155 gas_price: u64,
156 starting_height: u32,
157 block_horizon in 0..10_000u32,
158 percentage: u16,
159 ) {
160 _worst_case__correctly_calculates_value(
161 gas_price,
162 starting_height,
163 block_horizon,
164 percentage,
165 );
166 }
167 }
168
169 proptest! {
170 #[test]
171 fn worst_case_gas_price__never_overflows(
172 gas_price: u64,
173 starting_height: u32,
174 block_horizon in 0..10_000u32,
175 percentage: u16
176 ) {
177 let subject = UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
179
180 let target_height = starting_height.saturating_add(block_horizon);
182
183 let _ = subject.worst_case_gas_price(target_height.into());
184
185 }
188 }
189
190 fn _next_gas_price__correctly_calculates_value(
191 gas_price: u64,
192 starting_height: u32,
193 percentage: u16,
194 ) {
195 let subject =
197 UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
198
199 let estimated = subject.next_gas_price();
201
202 let change_amount = gas_price
204 .saturating_mul(percentage as u64)
205 .saturating_div(100);
206 let actual = gas_price.saturating_add(change_amount);
207
208 assert!(estimated >= actual);
209 }
210
211 proptest! {
212 #[test]
213 fn next_gas_price__correctly_calculates_value(
214 gas_price: u64,
215 starting_height: u32,
216 percentage: u16,
217 ) {
218 _next_gas_price__correctly_calculates_value(
219 gas_price,
220 starting_height,
221 percentage,
222 )
223 }
224 }
225}
226
227#[derive(Debug)]
231pub struct UniversalGasPriceProvider<Height, GasPrice> {
232 latest_gas_price: LatestGasPrice<Height, GasPrice>,
234 percentage: u16,
236}
237
238impl<Height, GasPrice> Clone for UniversalGasPriceProvider<Height, GasPrice> {
239 fn clone(&self) -> Self {
240 Self {
241 latest_gas_price: self.latest_gas_price.clone(),
242 percentage: self.percentage,
243 }
244 }
245}
246
247impl<Height, GasPrice> UniversalGasPriceProvider<Height, GasPrice> {
248 #[cfg(test)]
249 pub fn new(height: Height, price: GasPrice, percentage: u16) -> Self {
250 let latest_gas_price = LatestGasPrice::new(height, price);
251 Self {
252 latest_gas_price,
253 percentage,
254 }
255 }
256
257 pub fn new_from_inner(
258 inner: LatestGasPrice<Height, GasPrice>,
259 percentage: u16,
260 ) -> Self {
261 Self {
262 latest_gas_price: inner,
263 percentage,
264 }
265 }
266}
267
268impl<Height: Copy, GasPrice: Copy> UniversalGasPriceProvider<Height, GasPrice> {
269 fn get_height_and_gas_price(&self) -> (Height, GasPrice) {
270 self.latest_gas_price.get()
271 }
272}
273
274impl UniversalGasPriceProvider<u32, u64> {
275 pub fn inner_next_gas_price(&self) -> u64 {
276 let (_, latest_price) = self.get_height_and_gas_price();
277 let percentage = self.percentage;
278
279 let change = latest_price
280 .saturating_mul(percentage as u64)
281 .saturating_div(100);
282
283 latest_price.saturating_add(change)
284 }
285}
286
287impl TxPoolGasPriceProvider for UniversalGasPriceProvider<u32, u64> {
288 fn next_gas_price(&self) -> fuel_core_txpool::GasPrice {
289 self.inner_next_gas_price()
290 }
291}
292
293impl GasPriceEstimate for UniversalGasPriceProvider<u32, u64> {
294 fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64> {
295 let (best_height, best_gas_price) = self.get_height_and_gas_price();
296 let percentage = self.percentage;
297
298 let worst = cumulative_percentage_change(
299 best_gas_price,
300 best_height,
301 percentage as u64,
302 height.into(),
303 );
304 Some(worst)
305 }
306}
307
308#[derive(Clone)]
309pub struct PoAAdapter {
310 shared_state: Option<fuel_core_poa::service::SharedState>,
311}
312
313#[derive(Clone)]
314pub struct TxPoolAdapter {
315 service: TxPoolSharedState,
316}
317
318impl TxPoolAdapter {
319 pub fn new(service: TxPoolSharedState) -> Self {
320 Self { service }
321 }
322}
323
324pub struct TransactionsSource {
325 tx_pool: TxPoolSharedState,
326 minimum_gas_price: u64,
327}
328
329impl TransactionsSource {
330 pub fn new(minimum_gas_price: u64, tx_pool: TxPoolSharedState) -> Self {
331 Self {
332 tx_pool,
333 minimum_gas_price,
334 }
335 }
336}
337
338pub struct NewTxWaiter {
339 pub receiver: watch::Receiver<()>,
340 pub timeout: Instant,
341}
342
343impl NewTxWaiter {
344 pub fn new(receiver: watch::Receiver<()>, timeout: Instant) -> Self {
345 Self { receiver, timeout }
346 }
347}
348
349#[derive(Clone)]
350pub struct PreconfirmationSender {
351 pub sender_signature_service: mpsc::Sender<Vec<Preconfirmation>>,
352 pub tx_status_manager_adapter: TxStatusManagerAdapter,
353}
354
355impl PreconfirmationSender {
356 pub fn new(
357 sender_signature_service: mpsc::Sender<Vec<Preconfirmation>>,
358 tx_status_manager_adapter: TxStatusManagerAdapter,
359 ) -> Self {
360 Self {
361 sender_signature_service,
362 tx_status_manager_adapter,
363 }
364 }
365}
366
367#[derive(Clone)]
368pub struct ExecutorAdapter {
369 pub(crate) executor: Arc<Executor<Database, Database<Relayer>>>,
370 pub new_txs_watcher: watch::Receiver<()>,
371 pub preconfirmation_sender: PreconfirmationSender,
372}
373
374impl ExecutorAdapter {
375 pub fn new(
376 database: Database,
377 relayer_database: Database<Relayer>,
378 config: fuel_core_upgradable_executor::config::Config,
379 new_txs_watcher: watch::Receiver<()>,
380 preconfirmation_sender: PreconfirmationSender,
381 ) -> Self {
382 let executor = Executor::new(database, relayer_database, config);
383 Self {
384 executor: Arc::new(executor),
385 new_txs_watcher,
386 preconfirmation_sender,
387 }
388 }
389
390 pub fn produce_without_commit_from_vector(
391 &self,
392 component: Components<Vec<Transaction>>,
393 ) -> ExecutorResult<UncommittedResult<Changes>> {
394 let new_components = Components {
395 header_to_produce: component.header_to_produce,
396 transactions_source: OnceTransactionsSource::new(
397 component.transactions_source,
398 ),
399 gas_price: component.gas_price,
400 coinbase_recipient: component.coinbase_recipient,
401 };
402
403 self.executor
404 .produce_without_commit_with_source_direct_resolve(new_components)
405 }
406}
407
408#[derive(Clone)]
409pub struct VerifierAdapter {
410 pub block_verifier: Arc<Verifier<Database>>,
411}
412
413#[derive(Clone)]
414pub struct ConsensusAdapter {
415 pub block_verifier: Arc<Verifier<Database>>,
416 pub config: RelayerConsensusConfig,
417 pub maybe_relayer: MaybeRelayerAdapter,
418}
419
420impl ConsensusAdapter {
421 pub fn new(
422 block_verifier: VerifierAdapter,
423 config: RelayerConsensusConfig,
424 maybe_relayer: MaybeRelayerAdapter,
425 ) -> Self {
426 Self {
427 block_verifier: block_verifier.block_verifier,
428 config,
429 maybe_relayer,
430 }
431 }
432}
433
434#[derive(Clone)]
435pub struct MaybeRelayerAdapter {
436 #[cfg(feature = "relayer")]
437 pub relayer_synced: Option<fuel_core_relayer::SharedState>,
438 #[cfg(feature = "relayer")]
439 pub relayer_database: Database<Relayer>,
440 #[cfg(feature = "relayer")]
441 pub da_deploy_height: fuel_core_types::blockchain::primitives::DaBlockHeight,
442}
443
444#[derive(Clone)]
445pub struct BlockProducerAdapter {
446 pub block_producer: Arc<BlockProducerService>,
447}
448
449#[derive(Clone)]
450pub struct BlockImporterAdapter {
451 pub block_importer: Arc<fuel_core_importer::Importer>,
452}
453
454impl BlockImporterAdapter {
455 pub fn events(&self) -> BoxStream<ImporterResult> {
456 use futures::StreamExt;
457 fuel_core_services::stream::IntoBoxStream::into_boxed(
458 tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe())
459 .filter_map(|r| futures::future::ready(r.ok())),
460 )
461 }
462
463 pub fn events_shared_result(&self) -> BoxStream<SharedImportResult> {
464 use futures::StreamExt;
465 fuel_core_services::stream::IntoBoxStream::into_boxed(
466 tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe())
467 .filter_map(|r| futures::future::ready(r.ok()))
468 .map(|r| r.shared_result),
469 )
470 }
471}
472
473#[derive(Clone)]
474pub struct TxStatusManagerAdapter {
475 tx_status_manager_shared_data: SharedData,
476}
477
478impl Deref for TxStatusManagerAdapter {
479 type Target = SharedData;
480
481 fn deref(&self) -> &Self::Target {
482 &self.tx_status_manager_shared_data
483 }
484}
485
486impl TxStatusManagerAdapter {
487 pub fn new(tx_status_manager_shared_data: SharedData) -> Self {
488 Self {
489 tx_status_manager_shared_data,
490 }
491 }
492
493 pub async fn tx_update_subscribe(
495 &self,
496 tx_id: Bytes32,
497 ) -> anyhow::Result<TxStatusStream> {
498 self.tx_status_manager_shared_data.subscribe(tx_id).await
499 }
500}
501
502#[derive(Clone)]
503pub struct FuelBlockSigner {
504 mode: SignMode,
505}
506impl FuelBlockSigner {
507 pub fn new(mode: SignMode) -> Self {
508 Self { mode }
509 }
510}
511
512#[async_trait::async_trait]
513impl BlockSigner for FuelBlockSigner {
514 async fn seal_block(&self, block: &Block) -> anyhow::Result<Consensus> {
515 let block_hash = block.id();
516 let message = block_hash.into_message();
517 let signature = self.mode.sign_message(message).await?;
518 Ok(Consensus::PoA(PoAConsensus::new(signature)))
519 }
520
521 fn is_available(&self) -> bool {
522 self.mode.is_available()
523 }
524}
525
526#[cfg(feature = "shared-sequencer")]
527#[async_trait::async_trait]
528impl fuel_core_shared_sequencer::ports::Signer for FuelBlockSigner {
529 async fn sign(
530 &self,
531 data: &[u8],
532 ) -> anyhow::Result<fuel_core_types::fuel_crypto::Signature> {
533 Ok(self.mode.sign(data).await?)
534 }
535
536 fn public_key(&self) -> cosmrs::crypto::PublicKey {
537 let pubkey = self
538 .mode
539 .verifying_key()
540 .expect("Invalid public key")
541 .expect("Public key not available");
542 cosmrs::crypto::PublicKey::from(pubkey)
543 }
544
545 fn is_available(&self) -> bool {
546 self.mode.is_available()
547 }
548}
549
550#[cfg(feature = "p2p")]
551#[derive(Clone)]
552pub struct P2PAdapter {
553 service: Option<fuel_core_p2p::service::SharedState>,
554 peer_report_config: PeerReportConfig,
555}
556
557#[cfg(feature = "p2p")]
558#[derive(Clone, Default)]
559pub struct PeerReportConfig {
560 pub successful_block_import: AppScore,
561 pub missing_block_headers: AppScore,
562 pub bad_block_header: AppScore,
563 pub missing_transactions: AppScore,
564 pub invalid_transactions: AppScore,
565}
566
567#[cfg(not(feature = "p2p"))]
568#[derive(Default, Clone)]
569pub struct P2PAdapter;
570
571#[cfg(feature = "p2p")]
572impl P2PAdapter {
573 pub fn new(
574 service: Option<fuel_core_p2p::service::SharedState>,
575 peer_report_config: PeerReportConfig,
576 ) -> Self {
577 Self {
578 service,
579 peer_report_config,
580 }
581 }
582}
583
584#[cfg(not(feature = "p2p"))]
585impl P2PAdapter {
586 pub fn new() -> Self {
587 Default::default()
588 }
589}
590
591#[derive(Clone)]
592pub struct SharedMemoryPool {
593 memory_pool: MemoryPool,
594}
595
596impl SharedMemoryPool {
597 pub fn new(number_of_instances: usize) -> Self {
598 Self {
599 memory_pool: MemoryPool::new(number_of_instances),
600 }
601 }
602}
603
604pub struct SystemTime;
605
606impl fuel_core_poa::ports::GetTime for SystemTime {
607 fn now(&self) -> Tai64 {
608 Tai64::now()
609 }
610}