1use std::{
2 ops::Deref,
3 sync::Arc,
4};
5use tokio::sync::{
6 mpsc,
7 watch,
8};
9
10use fuel_core_consensus_module::{
11 RelayerConsensusConfig,
12 block_verifier::Verifier,
13};
14use fuel_core_executor::executor::OnceTransactionsSource;
15use fuel_core_gas_price_service::{
16 common::cumulative_percentage_change,
17 v1::service::LatestGasPrice,
18};
19use fuel_core_importer::ImporterResult;
20use fuel_core_poa::ports::BlockSigner;
21use fuel_core_services::stream::BoxStream;
22use fuel_core_storage::transactional::Changes;
23use fuel_core_tx_status_manager::{
24 SharedData,
25 TxStatusStream,
26};
27use fuel_core_txpool::ports::GasPriceProvider as TxPoolGasPriceProvider;
28use fuel_core_types::{
29 blockchain::{
30 block::Block,
31 consensus::{
32 Consensus,
33 poa::PoAConsensus,
34 },
35 },
36 fuel_tx::{
37 Bytes32,
38 Transaction,
39 },
40 fuel_types::BlockHeight,
41 services::{
42 block_importer::SharedImportResult,
43 block_producer::Components,
44 executor::{
45 Result as ExecutorResult,
46 UncommittedResult,
47 },
48 preconfirmation::Preconfirmation,
49 },
50 signer::SignMode,
51 tai64::Tai64,
52};
53use fuel_core_upgradable_executor::executor::Executor;
54use tokio::time::Instant;
55
56#[cfg(feature = "p2p")]
57use fuel_core_types::services::p2p::peer_reputation::AppScore;
58
59use crate::{
60 database::{
61 Database,
62 database_description::relayer::Relayer,
63 },
64 fuel_core_graphql_api::ports::GasPriceEstimate,
65 service::{
66 sub_services::{
67 BlockProducerService,
68 TxPoolSharedState,
69 },
70 vm_pool::MemoryPool,
71 },
72};
73
74pub mod block_importer;
75pub mod chain_state_info_provider;
76pub mod compression_adapters;
77pub mod consensus_module;
78pub mod executor;
79pub mod fuel_gas_price_provider;
80pub mod gas_price_adapters;
81pub mod graphql_api;
82pub mod import_result_provider;
83
84#[cfg(feature = "p2p")]
85pub mod p2p;
86pub mod producer;
87pub mod ready_signal;
88#[cfg(feature = "relayer")]
89pub mod relayer;
90#[cfg(feature = "rpc")]
91pub mod rpc;
92#[cfg(feature = "shared-sequencer")]
93pub mod shared_sequencer;
94#[cfg(feature = "p2p")]
95pub mod sync;
96pub mod tx_status_manager;
97pub mod txpool;
98
99#[derive(Debug, Clone)]
100pub struct ChainStateInfoProvider {
101 shared_state: chain_state_info_provider::SharedState,
102}
103
104impl ChainStateInfoProvider {
105 pub fn new(shared_state: chain_state_info_provider::SharedState) -> Self {
106 Self { shared_state }
107 }
108}
109
110#[derive(Debug, Clone)]
111pub struct StaticGasPrice {
112 pub gas_price: u64,
113}
114
115impl StaticGasPrice {
116 pub fn new(gas_price: u64) -> Self {
117 Self { gas_price }
118 }
119}
120
121#[cfg(test)]
122mod universal_gas_price_provider_tests {
123 #![allow(non_snake_case)]
124
125 use proptest::proptest;
126
127 use super::*;
128
129 fn _worst_case__correctly_calculates_value(
130 gas_price: u64,
131 starting_height: u32,
132 block_horizon: u32,
133 percentage: u16,
134 ) {
135 let subject =
137 UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
138
139 let target_height = starting_height.saturating_add(block_horizon);
141 let estimated = subject.worst_case_gas_price(target_height.into()).unwrap();
142
143 let mut actual = gas_price;
145
146 for _ in 0..block_horizon {
147 let change_amount =
148 actual.saturating_mul(percentage as u64).saturating_div(100);
149 actual = actual.saturating_add(change_amount);
150 }
151
152 assert!(estimated >= actual);
153 }
154
155 proptest! {
156 #[test]
157 fn worst_case_gas_price__correctly_calculates_value(
158 gas_price: u64,
159 starting_height: u32,
160 block_horizon in 0..10_000u32,
161 percentage: u16,
162 ) {
163 _worst_case__correctly_calculates_value(
164 gas_price,
165 starting_height,
166 block_horizon,
167 percentage,
168 );
169 }
170 }
171
172 proptest! {
173 #[test]
174 fn worst_case_gas_price__never_overflows(
175 gas_price: u64,
176 starting_height: u32,
177 block_horizon in 0..10_000u32,
178 percentage: u16
179 ) {
180 let subject = UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
182
183 let target_height = starting_height.saturating_add(block_horizon);
185
186 let _ = subject.worst_case_gas_price(target_height.into());
187
188 }
191 }
192
193 fn _next_gas_price__correctly_calculates_value(
194 gas_price: u64,
195 starting_height: u32,
196 percentage: u16,
197 ) {
198 let subject =
200 UniversalGasPriceProvider::new(starting_height, gas_price, percentage);
201
202 let estimated = subject.next_gas_price();
204
205 let change_amount = gas_price
207 .saturating_mul(percentage as u64)
208 .saturating_div(100);
209 let actual = gas_price.saturating_add(change_amount);
210
211 assert!(estimated >= actual);
212 }
213
214 proptest! {
215 #[test]
216 fn next_gas_price__correctly_calculates_value(
217 gas_price: u64,
218 starting_height: u32,
219 percentage: u16,
220 ) {
221 _next_gas_price__correctly_calculates_value(
222 gas_price,
223 starting_height,
224 percentage,
225 )
226 }
227 }
228}
229
230#[derive(Debug)]
234pub struct UniversalGasPriceProvider<Height, GasPrice> {
235 latest_gas_price: LatestGasPrice<Height, GasPrice>,
237 percentage: u16,
239}
240
241impl<Height, GasPrice> Clone for UniversalGasPriceProvider<Height, GasPrice> {
242 fn clone(&self) -> Self {
243 Self {
244 latest_gas_price: self.latest_gas_price.clone(),
245 percentage: self.percentage,
246 }
247 }
248}
249
250impl<Height, GasPrice> UniversalGasPriceProvider<Height, GasPrice> {
251 #[cfg(test)]
252 pub fn new(height: Height, price: GasPrice, percentage: u16) -> Self {
253 let latest_gas_price = LatestGasPrice::new(height, price);
254 Self {
255 latest_gas_price,
256 percentage,
257 }
258 }
259
260 pub fn new_from_inner(
261 inner: LatestGasPrice<Height, GasPrice>,
262 percentage: u16,
263 ) -> Self {
264 Self {
265 latest_gas_price: inner,
266 percentage,
267 }
268 }
269}
270
271impl<Height: Copy, GasPrice: Copy> UniversalGasPriceProvider<Height, GasPrice> {
272 fn get_height_and_gas_price(&self) -> (Height, GasPrice) {
273 self.latest_gas_price.get()
274 }
275}
276
277impl UniversalGasPriceProvider<u32, u64> {
278 pub fn inner_next_gas_price(&self) -> u64 {
279 let (_, latest_price) = self.get_height_and_gas_price();
280 let percentage = self.percentage;
281
282 let change = latest_price
283 .saturating_mul(percentage as u64)
284 .saturating_div(100);
285
286 latest_price.saturating_add(change)
287 }
288}
289
290impl TxPoolGasPriceProvider for UniversalGasPriceProvider<u32, u64> {
291 fn next_gas_price(&self) -> fuel_core_txpool::GasPrice {
292 self.inner_next_gas_price()
293 }
294}
295
296impl GasPriceEstimate for UniversalGasPriceProvider<u32, u64> {
297 fn worst_case_gas_price(&self, height: BlockHeight) -> Option<u64> {
298 let (best_height, best_gas_price) = self.get_height_and_gas_price();
299 let percentage = self.percentage;
300
301 let worst = cumulative_percentage_change(
302 best_gas_price,
303 best_height,
304 percentage as u64,
305 height.into(),
306 );
307 Some(worst)
308 }
309}
310
311#[derive(Clone)]
312pub struct PoAAdapter {
313 shared_state: Option<fuel_core_poa::service::SharedState>,
314}
315
316#[derive(Clone)]
317pub struct TxPoolAdapter {
318 service: TxPoolSharedState,
319}
320
321impl TxPoolAdapter {
322 pub fn new(service: TxPoolSharedState) -> Self {
323 Self { service }
324 }
325}
326
327pub struct TransactionsSource {
328 tx_pool: TxPoolSharedState,
329 minimum_gas_price: u64,
330}
331
332impl TransactionsSource {
333 pub fn new(minimum_gas_price: u64, tx_pool: TxPoolSharedState) -> Self {
334 Self {
335 tx_pool,
336 minimum_gas_price,
337 }
338 }
339}
340
341pub struct NewTxWaiter {
342 pub receiver: watch::Receiver<()>,
343 pub timeout: Instant,
344}
345
346impl NewTxWaiter {
347 pub fn new(receiver: watch::Receiver<()>, timeout: Instant) -> Self {
348 Self { receiver, timeout }
349 }
350}
351
352#[derive(Clone)]
353pub struct PreconfirmationSender {
354 pub sender_signature_service: mpsc::Sender<Vec<Preconfirmation>>,
355 pub tx_status_manager_adapter: TxStatusManagerAdapter,
356}
357
358impl PreconfirmationSender {
359 pub fn new(
360 sender_signature_service: mpsc::Sender<Vec<Preconfirmation>>,
361 tx_status_manager_adapter: TxStatusManagerAdapter,
362 ) -> Self {
363 Self {
364 sender_signature_service,
365 tx_status_manager_adapter,
366 }
367 }
368}
369
370#[derive(Clone)]
371pub struct ExecutorAdapter {
372 pub(crate) executor: Arc<Executor<Database, Database<Relayer>>>,
373 pub new_txs_watcher: watch::Receiver<()>,
374 pub preconfirmation_sender: PreconfirmationSender,
375}
376
377impl ExecutorAdapter {
378 pub fn new(
379 database: Database,
380 relayer_database: Database<Relayer>,
381 config: fuel_core_upgradable_executor::config::Config,
382 new_txs_watcher: watch::Receiver<()>,
383 preconfirmation_sender: PreconfirmationSender,
384 ) -> Self {
385 let executor = Executor::new(database, relayer_database, config);
386 Self {
387 executor: Arc::new(executor),
388 new_txs_watcher,
389 preconfirmation_sender,
390 }
391 }
392
393 pub fn produce_without_commit_from_vector(
394 &self,
395 component: Components<Vec<Transaction>>,
396 ) -> ExecutorResult<UncommittedResult<Changes>> {
397 let new_components = Components {
398 header_to_produce: component.header_to_produce,
399 transactions_source: OnceTransactionsSource::new(
400 component.transactions_source,
401 ),
402 gas_price: component.gas_price,
403 coinbase_recipient: component.coinbase_recipient,
404 };
405
406 self.executor
407 .produce_without_commit_with_source_direct_resolve(new_components)
408 }
409}
410
411#[derive(Clone)]
412pub struct VerifierAdapter {
413 pub block_verifier: Arc<Verifier<Database>>,
414}
415
416#[derive(Clone)]
417pub struct ConsensusAdapter {
418 pub block_verifier: Arc<Verifier<Database>>,
419 pub config: RelayerConsensusConfig,
420 pub maybe_relayer: MaybeRelayerAdapter,
421}
422
423impl ConsensusAdapter {
424 pub fn new(
425 block_verifier: VerifierAdapter,
426 config: RelayerConsensusConfig,
427 maybe_relayer: MaybeRelayerAdapter,
428 ) -> Self {
429 Self {
430 block_verifier: block_verifier.block_verifier,
431 config,
432 maybe_relayer,
433 }
434 }
435}
436
437#[derive(Clone)]
438pub struct MaybeRelayerAdapter {
439 #[cfg(feature = "relayer")]
440 pub relayer_synced: Option<fuel_core_relayer::SharedState>,
441 #[cfg(feature = "relayer")]
442 pub relayer_database: Database<Relayer>,
443 #[cfg(feature = "relayer")]
444 pub da_deploy_height: fuel_core_types::blockchain::primitives::DaBlockHeight,
445}
446
447#[derive(Clone)]
448pub struct BlockProducerAdapter {
449 pub block_producer: Arc<BlockProducerService>,
450}
451
452#[derive(Clone)]
453pub struct BlockImporterAdapter {
454 pub block_importer: Arc<fuel_core_importer::Importer>,
455 pub database: Database,
456}
457
458impl BlockImporterAdapter {
459 pub fn events(&self) -> BoxStream<ImporterResult> {
460 use futures::StreamExt;
461 fuel_core_services::stream::IntoBoxStream::into_boxed(
462 tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe())
463 .filter_map(|r| futures::future::ready(r.ok())),
464 )
465 }
466
467 pub fn events_shared_result(&self) -> BoxStream<SharedImportResult> {
468 use futures::StreamExt;
469 fuel_core_services::stream::IntoBoxStream::into_boxed(
470 tokio_stream::wrappers::BroadcastStream::new(self.block_importer.subscribe())
471 .filter_map(|r| futures::future::ready(r.ok()))
472 .map(|r| r.shared_result),
473 )
474 }
475}
476
477#[derive(Clone)]
478pub struct TxStatusManagerAdapter {
479 tx_status_manager_shared_data: SharedData,
480}
481
482impl Deref for TxStatusManagerAdapter {
483 type Target = SharedData;
484
485 fn deref(&self) -> &Self::Target {
486 &self.tx_status_manager_shared_data
487 }
488}
489
490impl TxStatusManagerAdapter {
491 pub fn new(tx_status_manager_shared_data: SharedData) -> Self {
492 Self {
493 tx_status_manager_shared_data,
494 }
495 }
496
497 pub async fn tx_update_subscribe(
499 &self,
500 tx_id: Bytes32,
501 ) -> anyhow::Result<TxStatusStream> {
502 self.tx_status_manager_shared_data.subscribe(tx_id).await
503 }
504}
505
506#[derive(Clone)]
507pub struct FuelBlockSigner {
508 mode: SignMode,
509}
510impl FuelBlockSigner {
511 pub fn new(mode: SignMode) -> Self {
512 Self { mode }
513 }
514}
515
516#[async_trait::async_trait]
517impl BlockSigner for FuelBlockSigner {
518 async fn seal_block(&self, block: &Block) -> anyhow::Result<Consensus> {
519 let block_hash = block.id();
520 let message = block_hash.into_message();
521 let signature = self.mode.sign_message(message).await?;
522 Ok(Consensus::PoA(PoAConsensus::new(signature)))
523 }
524
525 fn is_available(&self) -> bool {
526 self.mode.is_available()
527 }
528}
529
530#[cfg(feature = "shared-sequencer")]
531#[async_trait::async_trait]
532impl fuel_core_shared_sequencer::ports::Signer for FuelBlockSigner {
533 async fn sign(
534 &self,
535 data: &[u8],
536 ) -> anyhow::Result<fuel_core_types::fuel_crypto::Signature> {
537 Ok(self.mode.sign(data).await?)
538 }
539
540 fn public_key(&self) -> cosmrs::crypto::PublicKey {
541 let pubkey = self
542 .mode
543 .verifying_key()
544 .expect("Invalid public key")
545 .expect("Public key not available");
546 cosmrs::crypto::PublicKey::from(pubkey)
547 }
548
549 fn is_available(&self) -> bool {
550 self.mode.is_available()
551 }
552}
553
554#[cfg(feature = "p2p")]
555#[derive(Clone)]
556pub struct P2PAdapter {
557 service: Option<fuel_core_p2p::service::SharedState>,
558 peer_report_config: PeerReportConfig,
559}
560
561#[cfg(feature = "p2p")]
562#[derive(Clone, Default)]
563pub struct PeerReportConfig {
564 pub successful_block_import: AppScore,
565 pub missing_block_headers: AppScore,
566 pub bad_block_header: AppScore,
567 pub missing_transactions: AppScore,
568 pub invalid_transactions: AppScore,
569}
570
571#[cfg(not(feature = "p2p"))]
572#[derive(Default, Clone)]
573pub struct P2PAdapter;
574
575#[cfg(feature = "p2p")]
576impl P2PAdapter {
577 pub fn new(
578 service: Option<fuel_core_p2p::service::SharedState>,
579 peer_report_config: PeerReportConfig,
580 ) -> Self {
581 Self {
582 service,
583 peer_report_config,
584 }
585 }
586}
587
588#[cfg(not(feature = "p2p"))]
589impl P2PAdapter {
590 pub fn new() -> Self {
591 Default::default()
592 }
593}
594
595#[derive(Clone)]
596pub struct SharedMemoryPool {
597 memory_pool: MemoryPool,
598}
599
600impl SharedMemoryPool {
601 pub fn new(number_of_instances: usize) -> Self {
602 Self {
603 memory_pool: MemoryPool::new(number_of_instances),
604 }
605 }
606}
607
608pub struct SystemTime;
609
610impl fuel_core_poa::ports::GetTime for SystemTime {
611 fn now(&self) -> Tai64 {
612 Tai64::now()
613 }
614}