Skip to main content

fuel_core/
service.rs

1use std::{
2    net::SocketAddr,
3    sync::Arc,
4};
5
6use adapters::{
7    TxStatusManagerAdapter,
8    ready_signal::ReadySignal,
9};
10pub use config::{
11    Config,
12    DbType,
13    RelayerConsensusConfig,
14};
15use fuel_core_chain_config::{
16    ConsensusConfig,
17    GenesisCommitment,
18};
19use fuel_core_poa::{
20    ports::BlockImporter,
21    verifier::verify_consensus,
22};
23pub use fuel_core_services::Service as ServiceTrait;
24use fuel_core_services::{
25    RunnableService,
26    RunnableTask,
27    ServiceRunner,
28    State,
29    StateWatcher,
30    TaskNextAction,
31};
32use fuel_core_storage::{
33    IsNotFound,
34    StorageAsMut,
35    not_found,
36    tables::SealedBlockConsensus,
37    transactional::{
38        AtomicView,
39        ReadTransaction,
40        StorageChanges,
41    },
42};
43use fuel_core_types::{
44    blockchain::consensus::Consensus,
45    fuel_types::BlockHeight,
46};
47
48use self::adapters::BlockImporterAdapter;
49use crate::{
50    combined_database::{
51        CombinedDatabase,
52        ShutdownListener,
53    },
54    database::Database,
55    service::{
56        adapters::{
57            ExecutorAdapter,
58            PoAAdapter,
59        },
60        sub_services::TxPoolSharedState,
61    },
62};
63
64#[cfg(feature = "rpc")]
65use crate::database::database_description::block_aggregator::BlockAggregatorDatabase;
66#[cfg(feature = "rpc")]
67use fuel_core_block_aggregator_api::db::storage_or_remote_db::StorageOrRemoteBlocksProvider;
68
69pub mod adapters;
70pub mod config;
71pub mod genesis;
72pub mod metrics;
73mod query;
74pub mod sub_services;
75pub mod vm_pool;
76
77#[derive(Clone)]
78pub struct SharedState {
79    /// The PoA adaptor around the shared state of the consensus module.
80    pub poa_adapter: PoAAdapter,
81    /// The transaction pool shared state.
82    pub txpool_shared_state: TxPoolSharedState,
83    /// The Tx Status Manager
84    pub tx_status_manager: TxStatusManagerAdapter,
85    /// The P2P network shared state.
86    #[cfg(feature = "p2p")]
87    pub network: Option<fuel_core_p2p::service::SharedState>,
88    #[cfg(feature = "relayer")]
89    /// The Relayer shared state.
90    pub relayer: Option<fuel_core_relayer::SharedState>,
91    /// The GraphQL shared state.
92    pub graph_ql: crate::fuel_core_graphql_api::api_service::SharedState,
93    /// The underlying database.
94    pub database: CombinedDatabase,
95    /// Subscribe to new block production.
96    pub block_importer: BlockImporterAdapter,
97    /// The executor to validate blocks.
98    pub executor: ExecutorAdapter,
99    /// The config of the service.
100    pub config: Config,
101    /// The compression service shared data.
102    pub compression: Option<fuel_core_compression_service::service::SharedData>,
103    /// The gas price service shared data.
104    pub gas_price_service: fuel_core_gas_price_service::v1::service::SharedData,
105    #[cfg(feature = "rpc")]
106    /// The block aggregator RPC shared data.
107    pub block_aggregator_rpc: Option<
108        fuel_core_block_aggregator_api::service::SharedState<
109            StorageOrRemoteBlocksProvider<Database<BlockAggregatorDatabase>>,
110        >,
111    >,
112}
113
114pub struct FuelService {
115    /// The `ServiceRunner` used for `FuelService`.
116    ///
117    /// # Dev-note: The `FuelService` is already exposed as a public API and used by many crates.
118    /// To provide a user-friendly API and avoid breaking many downstream crates, `ServiceRunner`
119    /// is wrapped inside.
120    runner: ServiceRunner<Task>,
121    /// The shared state of the service
122    pub sub_services: Arc<SubServices>,
123    /// The shared state of the service
124    pub shared: SharedState,
125    /// The address bound by the system for serving the API
126    pub bound_address: SocketAddr,
127    /// RPC address
128    #[cfg(feature = "rpc")]
129    pub rpc_address: Option<SocketAddr>,
130}
131
132impl Drop for FuelService {
133    fn drop(&mut self) {
134        self.send_stop_signal();
135    }
136}
137
138impl FuelService {
139    /// Creates a `FuelService` instance from service config
140    #[tracing::instrument(skip_all, fields(name = %config.name))]
141    pub fn new<Shutdown>(
142        mut database: CombinedDatabase,
143        config: Config,
144        shutdown_listener: &mut Shutdown,
145    ) -> anyhow::Result<Self>
146    where
147        Shutdown: ShutdownListener,
148    {
149        let config = config.make_config_consistent();
150
151        // initialize state
152        tracing::info!("Initializing database");
153        database.check_version()?;
154
155        Self::make_database_compatible_with_config(
156            &mut database,
157            &config,
158            shutdown_listener,
159        )?;
160
161        // initialize sub services
162        database.sync_aux_db_heights(shutdown_listener)?;
163
164        let block_production_ready_signal = ReadySignal::new();
165
166        let (services, shared) = sub_services::init_sub_services(
167            &config,
168            database.clone(),
169            block_production_ready_signal.clone(),
170        )?;
171
172        let sub_services = Arc::new(services);
173        let task = Task::new(sub_services.clone(), database, shared.clone())?;
174        let runner = ServiceRunner::new_with_params(
175            task,
176            TaskParams {
177                block_production_ready_signal,
178            },
179        );
180        let bound_address = runner.shared.graph_ql.bound_address;
181
182        #[cfg(feature = "rpc")]
183        let rpc_address = runner
184            .shared
185            .block_aggregator_rpc
186            .clone()
187            .map(|state| state.bound_address);
188
189        Ok(FuelService {
190            sub_services,
191            bound_address,
192            shared,
193            runner,
194            #[cfg(feature = "rpc")]
195            rpc_address,
196        })
197    }
198
199    /// Creates and starts fuel node instance from service config
200    pub async fn new_node(config: Config) -> anyhow::Result<Self> {
201        // initialize database
202        let combined_database =
203            CombinedDatabase::from_config(&config.combined_db_config)?;
204
205        Self::from_combined_database(combined_database, config).await
206    }
207
208    /// Creates and starts fuel node instance from service config and a pre-existing on-chain database
209    pub async fn from_database(
210        database: Database,
211        config: Config,
212    ) -> anyhow::Result<Self> {
213        let combined_database = CombinedDatabase::new(
214            database,
215            Default::default(),
216            Default::default(),
217            Default::default(),
218            Default::default(),
219            #[cfg(feature = "rpc")]
220            Default::default(),
221        );
222        Self::from_combined_database(combined_database, config).await
223    }
224
225    /// Creates and starts fuel node instance from service config and a pre-existing combined database
226    pub async fn from_combined_database(
227        combined_database: CombinedDatabase,
228        config: Config,
229    ) -> anyhow::Result<Self> {
230        let mut listener = crate::ShutdownListener::spawn();
231        let service = Self::new(combined_database, config, &mut listener)?;
232        let state = service.start_and_await().await?;
233
234        if !state.started() {
235            return Err(anyhow::anyhow!(
236                "The state of the service is not started: {state:?}"
237            ));
238        }
239        Ok(service)
240    }
241
242    #[cfg(feature = "relayer")]
243    /// Wait for the Relayer to be in sync with
244    /// the data availability layer.
245    ///
246    /// Yields until the relayer reaches a point where it
247    /// considered up to date. Note that there's no guarantee
248    /// the relayer will ever catch up to the da layer and
249    /// may fall behind immediately after this future completes.
250    ///
251    /// The only guarantee is that if this future completes then
252    /// the relayer did reach consistency with the da layer for
253    /// some period of time.
254    pub async fn await_relayer_synced(&self) -> anyhow::Result<()> {
255        if let Some(relayer_handle) = &self.runner.shared.relayer {
256            relayer_handle.await_synced().await?;
257        }
258        Ok(())
259    }
260
261    /// Waits until the compression service has synced
262    /// with the given block height
263    pub async fn await_compression_synced_until(
264        &self,
265        block_height: &BlockHeight,
266    ) -> anyhow::Result<()> {
267        if let Some(sync_observer) = &self.runner.shared.compression {
268            sync_observer.await_synced_until(block_height).await?;
269        }
270        Ok(())
271    }
272
273    /// Waits until the gas price service has synced
274    /// with current l2 block height
275    pub async fn await_gas_price_synced(&self) -> anyhow::Result<()> {
276        let _ = &self.runner.shared.gas_price_service.await_synced().await?;
277        Ok(())
278    }
279
280    fn make_database_compatible_with_config<Shutdown>(
281        combined_database: &mut CombinedDatabase,
282        config: &Config,
283        shutdown_listener: &mut Shutdown,
284    ) -> anyhow::Result<()>
285    where
286        Shutdown: ShutdownListener,
287    {
288        let start_up_consensus_config = &config.snapshot_reader.chain_config().consensus;
289
290        let mut found_override_height = None;
291        match start_up_consensus_config {
292            ConsensusConfig::PoA { .. } => {
293                // We don't support overriding of the heights for PoA version 1.
294            }
295            ConsensusConfig::PoAV2(poa) => {
296                let on_chain_view = combined_database.on_chain().latest_view()?;
297
298                for override_height in poa.get_all_overrides().keys() {
299                    let Some(current_height) = on_chain_view.maybe_latest_height()?
300                    else {
301                        // Database is empty, nothing to rollback
302                        return Ok(());
303                    };
304
305                    if override_height > &current_height {
306                        return Ok(());
307                    }
308
309                    let block_header = on_chain_view
310                        .get_sealed_block_header(override_height)?
311                        .ok_or(not_found!("SealedBlockHeader"))?;
312                    let header = block_header.entity;
313                    let seal = block_header.consensus;
314
315                    if let Consensus::PoA(poa_seal) = seal {
316                        let block_valid = verify_consensus(
317                            start_up_consensus_config,
318                            &header,
319                            &poa_seal,
320                        );
321
322                        if !block_valid {
323                            found_override_height = Some(override_height);
324                        }
325                    } else {
326                        return Err(anyhow::anyhow!(
327                            "The consensus at override height {override_height} is not PoA."
328                        ));
329                    };
330                }
331            }
332        }
333
334        if let Some(override_height) = found_override_height {
335            let rollback_height = override_height.pred().ok_or(anyhow::anyhow!(
336                "The override height is zero. \
337                The override height should be greater than zero."
338            ))?;
339            tracing::warn!(
340                "The consensus at override height {override_height} \
341                does not match with the database. \
342                Rollbacking the database to the height {rollback_height}"
343            );
344            combined_database.rollback_to(rollback_height, shutdown_listener)?;
345        }
346
347        Ok(())
348    }
349
350    fn override_chain_config_if_needed(&self) -> anyhow::Result<()> {
351        let chain_config = self.shared.config.snapshot_reader.chain_config();
352        let on_chain_view = self.shared.database.on_chain().latest_view()?;
353        let chain_config_hash = chain_config.root()?.into();
354        let mut initialized_genesis = on_chain_view.get_genesis()?;
355        let genesis_chain_config_hash = initialized_genesis.chain_config_hash;
356
357        if genesis_chain_config_hash != chain_config_hash {
358            tracing::warn!(
359                "The genesis chain config hash({genesis_chain_config_hash}) \
360                is different from the current one({chain_config_hash}). \
361                Updating the genesis consensus parameters."
362            );
363
364            let genesis_block_height =
365                on_chain_view.genesis_height()?.ok_or(anyhow::anyhow!(
366                    "The genesis block height is not found in the database \
367                    during overriding the chain config hash."
368                ))?;
369            let mut database_tx = on_chain_view.read_transaction();
370
371            initialized_genesis.chain_config_hash = chain_config_hash;
372            database_tx
373                .storage_as_mut::<SealedBlockConsensus>()
374                .insert(
375                    &genesis_block_height,
376                    &Consensus::Genesis(initialized_genesis),
377                )?;
378
379            self.shared.database.on_chain().data.commit_changes(
380                Some(genesis_block_height),
381                StorageChanges::Changes(database_tx.into_changes()),
382            )?;
383        }
384
385        Ok(())
386    }
387
388    async fn prepare_genesis(&self, watcher: &StateWatcher) -> anyhow::Result<()> {
389        // check if chain is initialized
390        if let Err(err) = self.shared.database.on_chain().latest_view()?.get_genesis()
391            && err.is_not_found()
392        {
393            let result = genesis::execute_genesis_block(
394                watcher.clone(),
395                &self.shared.config,
396                &self.shared.database,
397            )
398            .await?;
399
400            self.shared.block_importer.commit_result(result).await?;
401        }
402
403        // repopulate missing tables
404        genesis::recover_missing_tables_from_genesis_state_config(
405            watcher.clone(),
406            &self.shared.config,
407            &self.shared.database,
408        )
409        .await?;
410
411        self.override_chain_config_if_needed()
412    }
413}
414
415impl FuelService {
416    /// Start all sub services and await for them to start.
417    pub async fn start_and_await(&self) -> anyhow::Result<State> {
418        let watcher = self.runner.state_watcher();
419        self.prepare_genesis(&watcher).await?;
420        self.runner.start_and_await().await
421    }
422
423    /// Sends the stop signal to all sub services.
424    pub fn send_stop_signal(&self) -> bool {
425        self.runner.stop()
426    }
427
428    /// Awaits for all services to shutdown.
429    pub async fn await_shutdown(&self) -> anyhow::Result<State> {
430        self.runner.await_stop().await
431    }
432
433    /// Sends the stop signal to all sub services and awaits for all services to shutdown.
434    pub async fn send_stop_signal_and_await_shutdown(&self) -> anyhow::Result<State> {
435        self.runner.stop_and_await().await
436    }
437
438    pub fn state(&self) -> State {
439        self.runner.state()
440    }
441
442    pub fn sub_services(&self) -> &SubServices {
443        self.sub_services.as_ref()
444    }
445}
446
447pub type SubServices = Vec<Box<dyn ServiceTrait + Send + Sync + 'static>>;
448
449struct Task {
450    /// The list of started sub services.
451    services: Arc<SubServices>,
452    /// The copies of the databases used by services.
453    database: CombinedDatabase,
454    /// The address bound by the system for serving the API
455    pub shared: SharedState,
456}
457
458impl Task {
459    /// Private inner method for initializing the fuel service task
460    pub fn new(
461        services: Arc<SubServices>,
462        database: CombinedDatabase,
463        shared: SharedState,
464    ) -> anyhow::Result<Task> {
465        Ok(Task {
466            services,
467            database,
468            shared,
469        })
470    }
471}
472
473#[derive(Default)]
474struct TaskParams {
475    block_production_ready_signal: ReadySignal,
476}
477
478#[async_trait::async_trait]
479impl RunnableService for Task {
480    const NAME: &'static str = "FuelService";
481    type SharedData = SharedState;
482    type Task = Task;
483    type TaskParams = TaskParams;
484
485    fn shared_data(&self) -> Self::SharedData {
486        self.shared.clone()
487    }
488
489    async fn into_task(
490        mut self,
491        watcher: &StateWatcher,
492        params: Self::TaskParams,
493    ) -> anyhow::Result<Self::Task> {
494        let mut watcher = watcher.clone();
495
496        for service in self.services.iter() {
497            tokio::select! {
498                _ = watcher.wait_stopping_or_stopped() => {
499                    break;
500                }
501                result = service.start_and_await() => {
502                    result?;
503                }
504            }
505        }
506
507        params.block_production_ready_signal.send_ready_signal();
508
509        Ok(self)
510    }
511}
512
513impl RunnableTask for Task {
514    #[tracing::instrument(skip_all)]
515    async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
516        let mut stop_signals = vec![];
517        for service in self.services.iter() {
518            stop_signals.push(service.await_stop())
519        }
520        stop_signals.push(Box::pin(watcher.while_started()));
521
522        let (result, _, _) = futures::future::select_all(stop_signals).await;
523
524        if let Err(err) = result {
525            tracing::error!("Got an error during listen for shutdown: {}", err);
526        }
527
528        // We received the stop signal from any of one source, so stop this service and
529        // all sub-services.
530        TaskNextAction::Stop
531    }
532
533    async fn shutdown(self) -> anyhow::Result<()> {
534        for service in self.services.iter() {
535            let result = service.stop_and_await().await;
536
537            if let Err(err) = result {
538                tracing::error!(
539                    "Got and error during awaiting for stop of the service: {}",
540                    err
541                );
542            }
543        }
544        self.database.shutdown();
545        Ok(())
546    }
547}
548
549#[allow(non_snake_case)]
550#[cfg(test)]
551mod tests {
552    use crate::{
553        ShutdownListener,
554        service::{
555            Config,
556            FuelService,
557        },
558    };
559    use fuel_core_services::State;
560    use std::{
561        thread::sleep,
562        time::Duration,
563    };
564
565    #[tokio::test]
566    async fn stop_sub_service_shutdown_all_services() {
567        // The test verify that if we stop any of sub-services
568        let mut i = 0;
569        loop {
570            let mut shutdown = ShutdownListener::spawn();
571            #[cfg(not(feature = "rpc"))]
572            let config = Config::local_node();
573            #[cfg(feature = "rpc")]
574            let config = Config::local_node_with_rpc();
575            let service =
576                FuelService::new(Default::default(), config, &mut shutdown).unwrap();
577            service.start_and_await().await.unwrap();
578            sleep(Duration::from_secs(1));
579            for service in service.sub_services() {
580                assert_eq!(service.state(), State::Started);
581            }
582
583            if i < service.sub_services().len() {
584                service.sub_services()[i].stop_and_await().await.unwrap();
585                tokio::time::timeout(Duration::from_secs(5), service.await_shutdown())
586                    .await
587                    .expect("Failed to stop the service in reasonable period of time")
588                    .expect("Failed to stop the service");
589            } else {
590                break;
591            }
592            i += 1;
593        }
594
595        // Current services:
596        // -    tx status manager
597        // -    graphql
598        // -    graphql worker
599        // -    txpool
600        // -    PoA
601        // -    gas price service
602        // -    chain info provider
603        #[allow(unused_mut)]
604        #[cfg(not(feature = "rpc"))]
605        let mut expected_services = 7;
606        #[cfg(feature = "rpc")]
607        let mut expected_services = 8;
608
609        // Relayer service is disabled with `Config::local_node`.
610        // #[cfg(feature = "relayer")]
611        // {
612        //     expected_services += 1;
613        // }
614        #[cfg(feature = "p2p")]
615        {
616            // p2p & sync & preconfirmation signature service
617            expected_services += 3;
618        }
619        #[cfg(feature = "shared-sequencer")]
620        {
621            expected_services += 1;
622        }
623
624        // # Dev-note: Update the `expected_services` when we add/remove a new/old service.
625        assert_eq!(i, expected_services);
626    }
627
628    #[tokio::test]
629    async fn stop_and_await___stops_all_services() {
630        let mut shutdown = ShutdownListener::spawn();
631        let service =
632            FuelService::new(Default::default(), Config::local_node(), &mut shutdown)
633                .unwrap();
634        service.start_and_await().await.unwrap();
635        let sub_services_watchers: Vec<_> = service
636            .sub_services()
637            .iter()
638            .map(|s| s.state_watcher())
639            .collect();
640
641        sleep(Duration::from_secs(1));
642        for service in service.sub_services() {
643            assert_eq!(service.state(), State::Started);
644        }
645        service.send_stop_signal_and_await_shutdown().await.unwrap();
646
647        for mut service in sub_services_watchers {
648            // Check that the state is `Stopped`(not `StoppedWithError`)
649            assert_eq!(service.borrow_and_update().clone(), State::Stopped);
650        }
651    }
652}