fuel_core/
service.rs

1use std::{
2    net::SocketAddr,
3    sync::Arc,
4};
5
6use adapters::{
7    TxStatusManagerAdapter,
8    ready_signal::ReadySignal,
9};
10pub use config::{
11    Config,
12    DbType,
13    RelayerConsensusConfig,
14};
15use fuel_core_chain_config::{
16    ConsensusConfig,
17    GenesisCommitment,
18};
19use fuel_core_poa::{
20    ports::BlockImporter,
21    verifier::verify_consensus,
22};
23pub use fuel_core_services::Service as ServiceTrait;
24use fuel_core_services::{
25    RunnableService,
26    RunnableTask,
27    ServiceRunner,
28    State,
29    StateWatcher,
30    TaskNextAction,
31};
32use fuel_core_storage::{
33    IsNotFound,
34    StorageAsMut,
35    not_found,
36    tables::SealedBlockConsensus,
37    transactional::{
38        AtomicView,
39        ReadTransaction,
40        StorageChanges,
41    },
42};
43use fuel_core_types::{
44    blockchain::consensus::Consensus,
45    fuel_types::BlockHeight,
46};
47
48use crate::{
49    combined_database::{
50        CombinedDatabase,
51        ShutdownListener,
52    },
53    database::Database,
54    service::{
55        adapters::{
56            ExecutorAdapter,
57            PoAAdapter,
58        },
59        sub_services::TxPoolSharedState,
60    },
61};
62
63use self::adapters::BlockImporterAdapter;
64
65pub mod adapters;
66pub mod config;
67pub mod genesis;
68pub mod metrics;
69mod query;
70pub mod sub_services;
71pub mod vm_pool;
72
73#[derive(Clone)]
74pub struct SharedState {
75    /// The PoA adaptor around the shared state of the consensus module.
76    pub poa_adapter: PoAAdapter,
77    /// The transaction pool shared state.
78    pub txpool_shared_state: TxPoolSharedState,
79    /// The Tx Status Manager
80    pub tx_status_manager: TxStatusManagerAdapter,
81    /// The P2P network shared state.
82    #[cfg(feature = "p2p")]
83    pub network: Option<fuel_core_p2p::service::SharedState>,
84    #[cfg(feature = "relayer")]
85    /// The Relayer shared state.
86    pub relayer: Option<fuel_core_relayer::SharedState>,
87    /// The GraphQL shared state.
88    pub graph_ql: crate::fuel_core_graphql_api::api_service::SharedState,
89    /// The underlying database.
90    pub database: CombinedDatabase,
91    /// Subscribe to new block production.
92    pub block_importer: BlockImporterAdapter,
93    /// The executor to validate blocks.
94    pub executor: ExecutorAdapter,
95    /// The config of the service.
96    pub config: Config,
97    /// The compression service shared data.
98    pub compression: Option<fuel_core_compression_service::service::SharedData>,
99    /// The gas price service shared data.
100    pub gas_price_service: fuel_core_gas_price_service::v1::service::SharedData,
101}
102
103pub struct FuelService {
104    /// The `ServiceRunner` used for `FuelService`.
105    ///
106    /// # Dev-note: The `FuelService` is already exposed as a public API and used by many crates.
107    /// To provide a user-friendly API and avoid breaking many downstream crates, `ServiceRunner`
108    /// is wrapped inside.
109    runner: ServiceRunner<Task>,
110    /// The shared state of the service
111    pub sub_services: Arc<SubServices>,
112    /// The shared state of the service
113    pub shared: SharedState,
114    /// The address bound by the system for serving the API
115    pub bound_address: SocketAddr,
116}
117
118impl Drop for FuelService {
119    fn drop(&mut self) {
120        self.send_stop_signal();
121    }
122}
123
124impl FuelService {
125    /// Creates a `FuelService` instance from service config
126    #[tracing::instrument(skip_all, fields(name = %config.name))]
127    pub fn new<Shutdown>(
128        mut database: CombinedDatabase,
129        config: Config,
130        shutdown_listener: &mut Shutdown,
131    ) -> anyhow::Result<Self>
132    where
133        Shutdown: ShutdownListener,
134    {
135        let config = config.make_config_consistent();
136
137        // initialize state
138        tracing::info!("Initializing database");
139        database.check_version()?;
140
141        Self::make_database_compatible_with_config(
142            &mut database,
143            &config,
144            shutdown_listener,
145        )?;
146
147        // initialize sub services
148        tracing::info!("Initializing sub services");
149        database.sync_aux_db_heights(shutdown_listener)?;
150
151        let block_production_ready_signal = ReadySignal::new();
152
153        let (services, shared) = sub_services::init_sub_services(
154            &config,
155            database.clone(),
156            block_production_ready_signal.clone(),
157        )?;
158
159        let sub_services = Arc::new(services);
160        let task = Task::new(sub_services.clone(), database, shared.clone())?;
161        let runner = ServiceRunner::new_with_params(
162            task,
163            TaskParams {
164                block_production_ready_signal,
165            },
166        );
167        let bound_address = runner.shared.graph_ql.bound_address;
168
169        Ok(FuelService {
170            sub_services,
171            bound_address,
172            shared,
173            runner,
174        })
175    }
176
177    /// Creates and starts fuel node instance from service config
178    pub async fn new_node(config: Config) -> anyhow::Result<Self> {
179        // initialize database
180        let combined_database =
181            CombinedDatabase::from_config(&config.combined_db_config)?;
182
183        Self::from_combined_database(combined_database, config).await
184    }
185
186    /// Creates and starts fuel node instance from service config and a pre-existing on-chain database
187    pub async fn from_database(
188        database: Database,
189        config: Config,
190    ) -> anyhow::Result<Self> {
191        let combined_database = CombinedDatabase::new(
192            database,
193            Default::default(),
194            Default::default(),
195            Default::default(),
196            Default::default(),
197        );
198        Self::from_combined_database(combined_database, config).await
199    }
200
201    /// Creates and starts fuel node instance from service config and a pre-existing combined database
202    pub async fn from_combined_database(
203        combined_database: CombinedDatabase,
204        config: Config,
205    ) -> anyhow::Result<Self> {
206        let mut listener = crate::ShutdownListener::spawn();
207        let service = Self::new(combined_database, config, &mut listener)?;
208        let state = service.start_and_await().await?;
209
210        if !state.started() {
211            return Err(anyhow::anyhow!(
212                "The state of the service is not started: {state:?}"
213            ));
214        }
215        Ok(service)
216    }
217
218    #[cfg(feature = "relayer")]
219    /// Wait for the Relayer to be in sync with
220    /// the data availability layer.
221    ///
222    /// Yields until the relayer reaches a point where it
223    /// considered up to date. Note that there's no guarantee
224    /// the relayer will ever catch up to the da layer and
225    /// may fall behind immediately after this future completes.
226    ///
227    /// The only guarantee is that if this future completes then
228    /// the relayer did reach consistency with the da layer for
229    /// some period of time.
230    pub async fn await_relayer_synced(&self) -> anyhow::Result<()> {
231        if let Some(relayer_handle) = &self.runner.shared.relayer {
232            relayer_handle.await_synced().await?;
233        }
234        Ok(())
235    }
236
237    /// Waits until the compression service has synced
238    /// with the given block height
239    pub async fn await_compression_synced_until(
240        &self,
241        block_height: &BlockHeight,
242    ) -> anyhow::Result<()> {
243        if let Some(sync_observer) = &self.runner.shared.compression {
244            sync_observer.await_synced_until(block_height).await?;
245        }
246        Ok(())
247    }
248
249    /// Waits until the gas price service has synced
250    /// with current l2 block height
251    pub async fn await_gas_price_synced(&self) -> anyhow::Result<()> {
252        let _ = &self.runner.shared.gas_price_service.await_synced().await?;
253        Ok(())
254    }
255
256    fn make_database_compatible_with_config<Shutdown>(
257        combined_database: &mut CombinedDatabase,
258        config: &Config,
259        shutdown_listener: &mut Shutdown,
260    ) -> anyhow::Result<()>
261    where
262        Shutdown: ShutdownListener,
263    {
264        let start_up_consensus_config = &config.snapshot_reader.chain_config().consensus;
265
266        let mut found_override_height = None;
267        match start_up_consensus_config {
268            ConsensusConfig::PoA { .. } => {
269                // We don't support overriding of the heights for PoA version 1.
270            }
271            ConsensusConfig::PoAV2(poa) => {
272                let on_chain_view = combined_database.on_chain().latest_view()?;
273
274                for override_height in poa.get_all_overrides().keys() {
275                    let Some(current_height) = on_chain_view.maybe_latest_height()?
276                    else {
277                        // Database is empty, nothing to rollback
278                        return Ok(());
279                    };
280
281                    if override_height > &current_height {
282                        return Ok(());
283                    }
284
285                    let block_header = on_chain_view
286                        .get_sealed_block_header(override_height)?
287                        .ok_or(not_found!("SealedBlockHeader"))?;
288                    let header = block_header.entity;
289                    let seal = block_header.consensus;
290
291                    if let Consensus::PoA(poa_seal) = seal {
292                        let block_valid = verify_consensus(
293                            start_up_consensus_config,
294                            &header,
295                            &poa_seal,
296                        );
297
298                        if !block_valid {
299                            found_override_height = Some(override_height);
300                        }
301                    } else {
302                        return Err(anyhow::anyhow!(
303                            "The consensus at override height {override_height} is not PoA."
304                        ));
305                    };
306                }
307            }
308        }
309
310        if let Some(override_height) = found_override_height {
311            let rollback_height = override_height.pred().ok_or(anyhow::anyhow!(
312                "The override height is zero. \
313                The override height should be greater than zero."
314            ))?;
315            tracing::warn!(
316                "The consensus at override height {override_height} \
317                does not match with the database. \
318                Rollbacking the database to the height {rollback_height}"
319            );
320            combined_database.rollback_to(rollback_height, shutdown_listener)?;
321        }
322
323        Ok(())
324    }
325
326    fn override_chain_config_if_needed(&self) -> anyhow::Result<()> {
327        let chain_config = self.shared.config.snapshot_reader.chain_config();
328        let on_chain_view = self.shared.database.on_chain().latest_view()?;
329        let chain_config_hash = chain_config.root()?.into();
330        let mut initialized_genesis = on_chain_view.get_genesis()?;
331        let genesis_chain_config_hash = initialized_genesis.chain_config_hash;
332
333        if genesis_chain_config_hash != chain_config_hash {
334            tracing::warn!(
335                "The genesis chain config hash({genesis_chain_config_hash}) \
336                is different from the current one({chain_config_hash}). \
337                Updating the genesis consensus parameters."
338            );
339
340            let genesis_block_height =
341                on_chain_view.genesis_height()?.ok_or(anyhow::anyhow!(
342                    "The genesis block height is not found in the database \
343                    during overriding the chain config hash."
344                ))?;
345            let mut database_tx = on_chain_view.read_transaction();
346
347            initialized_genesis.chain_config_hash = chain_config_hash;
348            database_tx
349                .storage_as_mut::<SealedBlockConsensus>()
350                .insert(
351                    &genesis_block_height,
352                    &Consensus::Genesis(initialized_genesis),
353                )?;
354
355            self.shared.database.on_chain().data.commit_changes(
356                Some(genesis_block_height),
357                StorageChanges::Changes(database_tx.into_changes()),
358            )?;
359        }
360
361        Ok(())
362    }
363
364    async fn prepare_genesis(&self, watcher: &StateWatcher) -> anyhow::Result<()> {
365        // check if chain is initialized
366        if let Err(err) = self.shared.database.on_chain().latest_view()?.get_genesis()
367            && err.is_not_found()
368        {
369            let result = genesis::execute_genesis_block(
370                watcher.clone(),
371                &self.shared.config,
372                &self.shared.database,
373            )
374            .await?;
375
376            self.shared.block_importer.commit_result(result).await?;
377        }
378
379        // repopulate missing tables
380        genesis::recover_missing_tables_from_genesis_state_config(
381            watcher.clone(),
382            &self.shared.config,
383            &self.shared.database,
384        )
385        .await?;
386
387        self.override_chain_config_if_needed()
388    }
389}
390
391impl FuelService {
392    /// Start all sub services and await for them to start.
393    pub async fn start_and_await(&self) -> anyhow::Result<State> {
394        let watcher = self.runner.state_watcher();
395        self.prepare_genesis(&watcher).await?;
396        self.runner.start_and_await().await
397    }
398
399    /// Sends the stop signal to all sub services.
400    pub fn send_stop_signal(&self) -> bool {
401        self.runner.stop()
402    }
403
404    /// Awaits for all services to shutdown.
405    pub async fn await_shutdown(&self) -> anyhow::Result<State> {
406        self.runner.await_stop().await
407    }
408
409    /// Sends the stop signal to all sub services and awaits for all services to shutdown.
410    pub async fn send_stop_signal_and_await_shutdown(&self) -> anyhow::Result<State> {
411        self.runner.stop_and_await().await
412    }
413
414    pub fn state(&self) -> State {
415        self.runner.state()
416    }
417
418    pub fn sub_services(&self) -> &SubServices {
419        self.sub_services.as_ref()
420    }
421}
422
423pub type SubServices = Vec<Box<dyn ServiceTrait + Send + Sync + 'static>>;
424
425struct Task {
426    /// The list of started sub services.
427    services: Arc<SubServices>,
428    /// The copies of the databases used by services.
429    database: CombinedDatabase,
430    /// The address bound by the system for serving the API
431    pub shared: SharedState,
432}
433
434impl Task {
435    /// Private inner method for initializing the fuel service task
436    pub fn new(
437        services: Arc<SubServices>,
438        database: CombinedDatabase,
439        shared: SharedState,
440    ) -> anyhow::Result<Task> {
441        Ok(Task {
442            services,
443            database,
444            shared,
445        })
446    }
447}
448
449#[derive(Default)]
450struct TaskParams {
451    block_production_ready_signal: ReadySignal,
452}
453
454#[async_trait::async_trait]
455impl RunnableService for Task {
456    const NAME: &'static str = "FuelService";
457    type SharedData = SharedState;
458    type Task = Task;
459    type TaskParams = TaskParams;
460
461    fn shared_data(&self) -> Self::SharedData {
462        self.shared.clone()
463    }
464
465    async fn into_task(
466        mut self,
467        watcher: &StateWatcher,
468        params: Self::TaskParams,
469    ) -> anyhow::Result<Self::Task> {
470        let mut watcher = watcher.clone();
471
472        for service in self.services.iter() {
473            tokio::select! {
474                _ = watcher.wait_stopping_or_stopped() => {
475                    break;
476                }
477                result = service.start_and_await() => {
478                    result?;
479                }
480            }
481        }
482
483        params.block_production_ready_signal.send_ready_signal();
484
485        Ok(self)
486    }
487}
488
489impl RunnableTask for Task {
490    #[tracing::instrument(skip_all)]
491    async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
492        let mut stop_signals = vec![];
493        for service in self.services.iter() {
494            stop_signals.push(service.await_stop())
495        }
496        stop_signals.push(Box::pin(watcher.while_started()));
497
498        let (result, _, _) = futures::future::select_all(stop_signals).await;
499
500        if let Err(err) = result {
501            tracing::error!("Got an error during listen for shutdown: {}", err);
502        }
503
504        // We received the stop signal from any of one source, so stop this service and
505        // all sub-services.
506        TaskNextAction::Stop
507    }
508
509    async fn shutdown(self) -> anyhow::Result<()> {
510        for service in self.services.iter() {
511            let result = service.stop_and_await().await;
512
513            if let Err(err) = result {
514                tracing::error!(
515                    "Got and error during awaiting for stop of the service: {}",
516                    err
517                );
518            }
519        }
520        self.database.shutdown();
521        Ok(())
522    }
523}
524
525#[allow(non_snake_case)]
526#[cfg(test)]
527mod tests {
528    use crate::{
529        ShutdownListener,
530        service::{
531            Config,
532            FuelService,
533        },
534    };
535    use fuel_core_services::State;
536    use std::{
537        thread::sleep,
538        time::Duration,
539    };
540
541    #[tokio::test]
542    async fn stop_sub_service_shutdown_all_services() {
543        // The test verify that if we stop any of sub-services
544        let mut i = 0;
545        loop {
546            let mut shutdown = ShutdownListener::spawn();
547            let service =
548                FuelService::new(Default::default(), Config::local_node(), &mut shutdown)
549                    .unwrap();
550            service.start_and_await().await.unwrap();
551            sleep(Duration::from_secs(1));
552            for service in service.sub_services() {
553                assert_eq!(service.state(), State::Started);
554            }
555
556            if i < service.sub_services().len() {
557                service.sub_services()[i].stop_and_await().await.unwrap();
558                tokio::time::timeout(Duration::from_secs(5), service.await_shutdown())
559                    .await
560                    .expect("Failed to stop the service in reasonable period of time")
561                    .expect("Failed to stop the service");
562            } else {
563                break;
564            }
565            i += 1;
566        }
567
568        // Current services:
569        // -    tx status manager
570        // -    graphql
571        // -    graphql worker
572        // -    txpool
573        // -    PoA
574        // -    gas price service
575        // -    chain info provider
576        #[allow(unused_mut)]
577        let mut expected_services = 7;
578
579        // Relayer service is disabled with `Config::local_node`.
580        // #[cfg(feature = "relayer")]
581        // {
582        //     expected_services += 1;
583        // }
584        #[cfg(feature = "p2p")]
585        {
586            // p2p & sync & preconfirmation signature service
587            expected_services += 3;
588        }
589        #[cfg(feature = "shared-sequencer")]
590        {
591            expected_services += 1;
592        }
593
594        // # Dev-note: Update the `expected_services` when we add/remove a new/old service.
595        assert_eq!(i, expected_services);
596    }
597
598    #[tokio::test]
599    async fn stop_and_await___stops_all_services() {
600        let mut shutdown = ShutdownListener::spawn();
601        let service =
602            FuelService::new(Default::default(), Config::local_node(), &mut shutdown)
603                .unwrap();
604        service.start_and_await().await.unwrap();
605        let sub_services_watchers: Vec<_> = service
606            .sub_services()
607            .iter()
608            .map(|s| s.state_watcher())
609            .collect();
610
611        sleep(Duration::from_secs(1));
612        for service in service.sub_services() {
613            assert_eq!(service.state(), State::Started);
614        }
615        service.send_stop_signal_and_await_shutdown().await.unwrap();
616
617        for mut service in sub_services_watchers {
618            // Check that the state is `Stopped`(not `StoppedWithError`)
619            assert_eq!(service.borrow_and_update().clone(), State::Stopped);
620        }
621    }
622}