1use std::{
2 net::SocketAddr,
3 sync::Arc,
4};
5
6use adapters::{
7 TxStatusManagerAdapter,
8 ready_signal::ReadySignal,
9};
10pub use config::{
11 Config,
12 DbType,
13 RelayerConsensusConfig,
14};
15use fuel_core_chain_config::{
16 ConsensusConfig,
17 GenesisCommitment,
18};
19use fuel_core_poa::{
20 ports::BlockImporter,
21 verifier::verify_consensus,
22};
23pub use fuel_core_services::Service as ServiceTrait;
24use fuel_core_services::{
25 RunnableService,
26 RunnableTask,
27 ServiceRunner,
28 State,
29 StateWatcher,
30 TaskNextAction,
31};
32use fuel_core_storage::{
33 IsNotFound,
34 StorageAsMut,
35 not_found,
36 tables::SealedBlockConsensus,
37 transactional::{
38 AtomicView,
39 ReadTransaction,
40 StorageChanges,
41 },
42};
43use fuel_core_types::{
44 blockchain::consensus::Consensus,
45 fuel_types::BlockHeight,
46};
47
48use self::adapters::BlockImporterAdapter;
49use crate::{
50 combined_database::{
51 CombinedDatabase,
52 ShutdownListener,
53 },
54 database::Database,
55 service::{
56 adapters::{
57 ExecutorAdapter,
58 PoAAdapter,
59 },
60 sub_services::TxPoolSharedState,
61 },
62};
63
64#[cfg(feature = "rpc")]
65use crate::database::database_description::block_aggregator::BlockAggregatorDatabase;
66#[cfg(feature = "rpc")]
67use fuel_core_block_aggregator_api::db::storage_or_remote_db::StorageOrRemoteBlocksProvider;
68
69pub mod adapters;
70pub mod config;
71pub mod genesis;
72pub mod metrics;
73mod query;
74pub mod sub_services;
75pub mod vm_pool;
76
77#[derive(Clone)]
78pub struct SharedState {
79 pub poa_adapter: PoAAdapter,
81 pub txpool_shared_state: TxPoolSharedState,
83 pub tx_status_manager: TxStatusManagerAdapter,
85 #[cfg(feature = "p2p")]
87 pub network: Option<fuel_core_p2p::service::SharedState>,
88 #[cfg(feature = "relayer")]
89 pub relayer: Option<fuel_core_relayer::SharedState>,
91 pub graph_ql: crate::fuel_core_graphql_api::api_service::SharedState,
93 pub database: CombinedDatabase,
95 pub block_importer: BlockImporterAdapter,
97 pub executor: ExecutorAdapter,
99 pub config: Config,
101 pub compression: Option<fuel_core_compression_service::service::SharedData>,
103 pub gas_price_service: fuel_core_gas_price_service::v1::service::SharedData,
105 #[cfg(feature = "rpc")]
106 pub block_aggregator_rpc: Option<
108 fuel_core_block_aggregator_api::service::SharedState<
109 StorageOrRemoteBlocksProvider<Database<BlockAggregatorDatabase>>,
110 >,
111 >,
112}
113
114pub struct FuelService {
115 runner: ServiceRunner<Task>,
121 pub sub_services: Arc<SubServices>,
123 pub shared: SharedState,
125 pub bound_address: SocketAddr,
127 #[cfg(feature = "rpc")]
129 pub rpc_address: Option<SocketAddr>,
130}
131
132impl Drop for FuelService {
133 fn drop(&mut self) {
134 self.send_stop_signal();
135 }
136}
137
138impl FuelService {
139 #[tracing::instrument(skip_all, fields(name = %config.name))]
141 pub fn new<Shutdown>(
142 mut database: CombinedDatabase,
143 config: Config,
144 shutdown_listener: &mut Shutdown,
145 ) -> anyhow::Result<Self>
146 where
147 Shutdown: ShutdownListener,
148 {
149 let config = config.make_config_consistent();
150
151 tracing::info!("Initializing database");
153 database.check_version()?;
154
155 Self::make_database_compatible_with_config(
156 &mut database,
157 &config,
158 shutdown_listener,
159 )?;
160
161 database.sync_aux_db_heights(shutdown_listener)?;
163
164 let block_production_ready_signal = ReadySignal::new();
165
166 let (services, shared) = sub_services::init_sub_services(
167 &config,
168 database.clone(),
169 block_production_ready_signal.clone(),
170 )?;
171
172 let sub_services = Arc::new(services);
173 let task = Task::new(sub_services.clone(), database, shared.clone())?;
174 let runner = ServiceRunner::new_with_params(
175 task,
176 TaskParams {
177 block_production_ready_signal,
178 },
179 );
180 let bound_address = runner.shared.graph_ql.bound_address;
181
182 #[cfg(feature = "rpc")]
183 let rpc_address = runner
184 .shared
185 .block_aggregator_rpc
186 .clone()
187 .map(|state| state.bound_address);
188
189 Ok(FuelService {
190 sub_services,
191 bound_address,
192 shared,
193 runner,
194 #[cfg(feature = "rpc")]
195 rpc_address,
196 })
197 }
198
199 pub async fn new_node(config: Config) -> anyhow::Result<Self> {
201 let combined_database =
203 CombinedDatabase::from_config(&config.combined_db_config)?;
204
205 Self::from_combined_database(combined_database, config).await
206 }
207
208 pub async fn from_database(
210 database: Database,
211 config: Config,
212 ) -> anyhow::Result<Self> {
213 let combined_database = CombinedDatabase::new(
214 database,
215 Default::default(),
216 Default::default(),
217 Default::default(),
218 Default::default(),
219 #[cfg(feature = "rpc")]
220 Default::default(),
221 );
222 Self::from_combined_database(combined_database, config).await
223 }
224
225 pub async fn from_combined_database(
227 combined_database: CombinedDatabase,
228 config: Config,
229 ) -> anyhow::Result<Self> {
230 let mut listener = crate::ShutdownListener::spawn();
231 let service = Self::new(combined_database, config, &mut listener)?;
232 let state = service.start_and_await().await?;
233
234 if !state.started() {
235 return Err(anyhow::anyhow!(
236 "The state of the service is not started: {state:?}"
237 ));
238 }
239 Ok(service)
240 }
241
242 #[cfg(feature = "relayer")]
243 pub async fn await_relayer_synced(&self) -> anyhow::Result<()> {
255 if let Some(relayer_handle) = &self.runner.shared.relayer {
256 relayer_handle.await_synced().await?;
257 }
258 Ok(())
259 }
260
261 pub async fn await_compression_synced_until(
264 &self,
265 block_height: &BlockHeight,
266 ) -> anyhow::Result<()> {
267 if let Some(sync_observer) = &self.runner.shared.compression {
268 sync_observer.await_synced_until(block_height).await?;
269 }
270 Ok(())
271 }
272
273 pub async fn await_gas_price_synced(&self) -> anyhow::Result<()> {
276 let _ = &self.runner.shared.gas_price_service.await_synced().await?;
277 Ok(())
278 }
279
280 fn make_database_compatible_with_config<Shutdown>(
281 combined_database: &mut CombinedDatabase,
282 config: &Config,
283 shutdown_listener: &mut Shutdown,
284 ) -> anyhow::Result<()>
285 where
286 Shutdown: ShutdownListener,
287 {
288 let start_up_consensus_config = &config.snapshot_reader.chain_config().consensus;
289
290 let mut found_override_height = None;
291 match start_up_consensus_config {
292 ConsensusConfig::PoA { .. } => {
293 }
295 ConsensusConfig::PoAV2(poa) => {
296 let on_chain_view = combined_database.on_chain().latest_view()?;
297
298 for override_height in poa.get_all_overrides().keys() {
299 let Some(current_height) = on_chain_view.maybe_latest_height()?
300 else {
301 return Ok(());
303 };
304
305 if override_height > ¤t_height {
306 return Ok(());
307 }
308
309 let block_header = on_chain_view
310 .get_sealed_block_header(override_height)?
311 .ok_or(not_found!("SealedBlockHeader"))?;
312 let header = block_header.entity;
313 let seal = block_header.consensus;
314
315 if let Consensus::PoA(poa_seal) = seal {
316 let block_valid = verify_consensus(
317 start_up_consensus_config,
318 &header,
319 &poa_seal,
320 );
321
322 if !block_valid {
323 found_override_height = Some(override_height);
324 }
325 } else {
326 return Err(anyhow::anyhow!(
327 "The consensus at override height {override_height} is not PoA."
328 ));
329 };
330 }
331 }
332 }
333
334 if let Some(override_height) = found_override_height {
335 let rollback_height = override_height.pred().ok_or(anyhow::anyhow!(
336 "The override height is zero. \
337 The override height should be greater than zero."
338 ))?;
339 tracing::warn!(
340 "The consensus at override height {override_height} \
341 does not match with the database. \
342 Rollbacking the database to the height {rollback_height}"
343 );
344 combined_database.rollback_to(rollback_height, shutdown_listener)?;
345 }
346
347 Ok(())
348 }
349
350 fn override_chain_config_if_needed(&self) -> anyhow::Result<()> {
351 let chain_config = self.shared.config.snapshot_reader.chain_config();
352 let on_chain_view = self.shared.database.on_chain().latest_view()?;
353 let chain_config_hash = chain_config.root()?.into();
354 let mut initialized_genesis = on_chain_view.get_genesis()?;
355 let genesis_chain_config_hash = initialized_genesis.chain_config_hash;
356
357 if genesis_chain_config_hash != chain_config_hash {
358 tracing::warn!(
359 "The genesis chain config hash({genesis_chain_config_hash}) \
360 is different from the current one({chain_config_hash}). \
361 Updating the genesis consensus parameters."
362 );
363
364 let genesis_block_height =
365 on_chain_view.genesis_height()?.ok_or(anyhow::anyhow!(
366 "The genesis block height is not found in the database \
367 during overriding the chain config hash."
368 ))?;
369 let mut database_tx = on_chain_view.read_transaction();
370
371 initialized_genesis.chain_config_hash = chain_config_hash;
372 database_tx
373 .storage_as_mut::<SealedBlockConsensus>()
374 .insert(
375 &genesis_block_height,
376 &Consensus::Genesis(initialized_genesis),
377 )?;
378
379 self.shared.database.on_chain().data.commit_changes(
380 Some(genesis_block_height),
381 StorageChanges::Changes(database_tx.into_changes()),
382 )?;
383 }
384
385 Ok(())
386 }
387
388 async fn prepare_genesis(&self, watcher: &StateWatcher) -> anyhow::Result<()> {
389 if let Err(err) = self.shared.database.on_chain().latest_view()?.get_genesis()
391 && err.is_not_found()
392 {
393 let result = genesis::execute_genesis_block(
394 watcher.clone(),
395 &self.shared.config,
396 &self.shared.database,
397 )
398 .await?;
399
400 self.shared.block_importer.commit_result(result).await?;
401 }
402
403 genesis::recover_missing_tables_from_genesis_state_config(
405 watcher.clone(),
406 &self.shared.config,
407 &self.shared.database,
408 )
409 .await?;
410
411 self.override_chain_config_if_needed()
412 }
413}
414
415impl FuelService {
416 pub async fn start_and_await(&self) -> anyhow::Result<State> {
418 let watcher = self.runner.state_watcher();
419 self.prepare_genesis(&watcher).await?;
420 self.runner.start_and_await().await
421 }
422
423 pub fn send_stop_signal(&self) -> bool {
425 self.runner.stop()
426 }
427
428 pub async fn await_shutdown(&self) -> anyhow::Result<State> {
430 self.runner.await_stop().await
431 }
432
433 pub async fn send_stop_signal_and_await_shutdown(&self) -> anyhow::Result<State> {
435 self.runner.stop_and_await().await
436 }
437
438 pub fn state(&self) -> State {
439 self.runner.state()
440 }
441
442 pub fn sub_services(&self) -> &SubServices {
443 self.sub_services.as_ref()
444 }
445}
446
447pub type SubServices = Vec<Box<dyn ServiceTrait + Send + Sync + 'static>>;
448
449struct Task {
450 services: Arc<SubServices>,
452 database: CombinedDatabase,
454 pub shared: SharedState,
456}
457
458impl Task {
459 pub fn new(
461 services: Arc<SubServices>,
462 database: CombinedDatabase,
463 shared: SharedState,
464 ) -> anyhow::Result<Task> {
465 Ok(Task {
466 services,
467 database,
468 shared,
469 })
470 }
471}
472
473#[derive(Default)]
474struct TaskParams {
475 block_production_ready_signal: ReadySignal,
476}
477
478#[async_trait::async_trait]
479impl RunnableService for Task {
480 const NAME: &'static str = "FuelService";
481 type SharedData = SharedState;
482 type Task = Task;
483 type TaskParams = TaskParams;
484
485 fn shared_data(&self) -> Self::SharedData {
486 self.shared.clone()
487 }
488
489 async fn into_task(
490 mut self,
491 watcher: &StateWatcher,
492 params: Self::TaskParams,
493 ) -> anyhow::Result<Self::Task> {
494 let mut watcher = watcher.clone();
495
496 for service in self.services.iter() {
497 tokio::select! {
498 _ = watcher.wait_stopping_or_stopped() => {
499 break;
500 }
501 result = service.start_and_await() => {
502 result?;
503 }
504 }
505 }
506
507 params.block_production_ready_signal.send_ready_signal();
508
509 Ok(self)
510 }
511}
512
513impl RunnableTask for Task {
514 #[tracing::instrument(skip_all)]
515 async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
516 let mut stop_signals = vec![];
517 for service in self.services.iter() {
518 stop_signals.push(service.await_stop())
519 }
520 stop_signals.push(Box::pin(watcher.while_started()));
521
522 let (result, _, _) = futures::future::select_all(stop_signals).await;
523
524 if let Err(err) = result {
525 tracing::error!("Got an error during listen for shutdown: {}", err);
526 }
527
528 TaskNextAction::Stop
531 }
532
533 async fn shutdown(self) -> anyhow::Result<()> {
534 for service in self.services.iter() {
535 let result = service.stop_and_await().await;
536
537 if let Err(err) = result {
538 tracing::error!(
539 "Got and error during awaiting for stop of the service: {}",
540 err
541 );
542 }
543 }
544 self.database.shutdown();
545 Ok(())
546 }
547}
548
549#[allow(non_snake_case)]
550#[cfg(test)]
551mod tests {
552 use crate::{
553 ShutdownListener,
554 service::{
555 Config,
556 FuelService,
557 },
558 };
559 use fuel_core_services::State;
560 use std::{
561 thread::sleep,
562 time::Duration,
563 };
564
565 #[tokio::test]
566 async fn stop_sub_service_shutdown_all_services() {
567 let mut i = 0;
569 loop {
570 let mut shutdown = ShutdownListener::spawn();
571 #[cfg(not(feature = "rpc"))]
572 let config = Config::local_node();
573 #[cfg(feature = "rpc")]
574 let config = Config::local_node_with_rpc();
575 let service =
576 FuelService::new(Default::default(), config, &mut shutdown).unwrap();
577 service.start_and_await().await.unwrap();
578 sleep(Duration::from_secs(1));
579 for service in service.sub_services() {
580 assert_eq!(service.state(), State::Started);
581 }
582
583 if i < service.sub_services().len() {
584 service.sub_services()[i].stop_and_await().await.unwrap();
585 tokio::time::timeout(Duration::from_secs(5), service.await_shutdown())
586 .await
587 .expect("Failed to stop the service in reasonable period of time")
588 .expect("Failed to stop the service");
589 } else {
590 break;
591 }
592 i += 1;
593 }
594
595 #[allow(unused_mut)]
604 #[cfg(not(feature = "rpc"))]
605 let mut expected_services = 7;
606 #[cfg(feature = "rpc")]
607 let mut expected_services = 8;
608
609 #[cfg(feature = "p2p")]
615 {
616 expected_services += 3;
618 }
619 #[cfg(feature = "shared-sequencer")]
620 {
621 expected_services += 1;
622 }
623
624 assert_eq!(i, expected_services);
626 }
627
628 #[tokio::test]
629 async fn stop_and_await___stops_all_services() {
630 let mut shutdown = ShutdownListener::spawn();
631 let service =
632 FuelService::new(Default::default(), Config::local_node(), &mut shutdown)
633 .unwrap();
634 service.start_and_await().await.unwrap();
635 let sub_services_watchers: Vec<_> = service
636 .sub_services()
637 .iter()
638 .map(|s| s.state_watcher())
639 .collect();
640
641 sleep(Duration::from_secs(1));
642 for service in service.sub_services() {
643 assert_eq!(service.state(), State::Started);
644 }
645 service.send_stop_signal_and_await_shutdown().await.unwrap();
646
647 for mut service in sub_services_watchers {
648 assert_eq!(service.borrow_and_update().clone(), State::Stopped);
650 }
651 }
652}