1use std::{
2 net::SocketAddr,
3 sync::Arc,
4};
5
6use adapters::{
7 TxStatusManagerAdapter,
8 ready_signal::ReadySignal,
9};
10pub use config::{
11 Config,
12 DbType,
13 RelayerConsensusConfig,
14};
15use fuel_core_chain_config::{
16 ConsensusConfig,
17 GenesisCommitment,
18};
19use fuel_core_poa::{
20 ports::BlockImporter,
21 verifier::verify_consensus,
22};
23pub use fuel_core_services::Service as ServiceTrait;
24use fuel_core_services::{
25 RunnableService,
26 RunnableTask,
27 ServiceRunner,
28 State,
29 StateWatcher,
30 TaskNextAction,
31};
32use fuel_core_storage::{
33 IsNotFound,
34 StorageAsMut,
35 not_found,
36 tables::SealedBlockConsensus,
37 transactional::{
38 AtomicView,
39 ReadTransaction,
40 StorageChanges,
41 },
42};
43use fuel_core_types::{
44 blockchain::consensus::Consensus,
45 fuel_types::BlockHeight,
46};
47
48use crate::{
49 combined_database::{
50 CombinedDatabase,
51 ShutdownListener,
52 },
53 database::Database,
54 service::{
55 adapters::{
56 ExecutorAdapter,
57 PoAAdapter,
58 },
59 sub_services::TxPoolSharedState,
60 },
61};
62
63use self::adapters::BlockImporterAdapter;
64
65pub mod adapters;
66pub mod config;
67pub mod genesis;
68pub mod metrics;
69mod query;
70pub mod sub_services;
71pub mod vm_pool;
72
73#[derive(Clone)]
74pub struct SharedState {
75 pub poa_adapter: PoAAdapter,
77 pub txpool_shared_state: TxPoolSharedState,
79 pub tx_status_manager: TxStatusManagerAdapter,
81 #[cfg(feature = "p2p")]
83 pub network: Option<fuel_core_p2p::service::SharedState>,
84 #[cfg(feature = "relayer")]
85 pub relayer: Option<fuel_core_relayer::SharedState>,
87 pub graph_ql: crate::fuel_core_graphql_api::api_service::SharedState,
89 pub database: CombinedDatabase,
91 pub block_importer: BlockImporterAdapter,
93 pub executor: ExecutorAdapter,
95 pub config: Config,
97 pub compression: Option<fuel_core_compression_service::service::SharedData>,
99 pub gas_price_service: fuel_core_gas_price_service::v1::service::SharedData,
101}
102
103pub struct FuelService {
104 runner: ServiceRunner<Task>,
110 pub sub_services: Arc<SubServices>,
112 pub shared: SharedState,
114 pub bound_address: SocketAddr,
116}
117
118impl Drop for FuelService {
119 fn drop(&mut self) {
120 self.send_stop_signal();
121 }
122}
123
124impl FuelService {
125 #[tracing::instrument(skip_all, fields(name = %config.name))]
127 pub fn new<Shutdown>(
128 mut database: CombinedDatabase,
129 config: Config,
130 shutdown_listener: &mut Shutdown,
131 ) -> anyhow::Result<Self>
132 where
133 Shutdown: ShutdownListener,
134 {
135 let config = config.make_config_consistent();
136
137 tracing::info!("Initializing database");
139 database.check_version()?;
140
141 Self::make_database_compatible_with_config(
142 &mut database,
143 &config,
144 shutdown_listener,
145 )?;
146
147 tracing::info!("Initializing sub services");
149 database.sync_aux_db_heights(shutdown_listener)?;
150
151 let block_production_ready_signal = ReadySignal::new();
152
153 let (services, shared) = sub_services::init_sub_services(
154 &config,
155 database.clone(),
156 block_production_ready_signal.clone(),
157 )?;
158
159 let sub_services = Arc::new(services);
160 let task = Task::new(sub_services.clone(), database, shared.clone())?;
161 let runner = ServiceRunner::new_with_params(
162 task,
163 TaskParams {
164 block_production_ready_signal,
165 },
166 );
167 let bound_address = runner.shared.graph_ql.bound_address;
168
169 Ok(FuelService {
170 sub_services,
171 bound_address,
172 shared,
173 runner,
174 })
175 }
176
177 pub async fn new_node(config: Config) -> anyhow::Result<Self> {
179 let combined_database =
181 CombinedDatabase::from_config(&config.combined_db_config)?;
182
183 Self::from_combined_database(combined_database, config).await
184 }
185
186 pub async fn from_database(
188 database: Database,
189 config: Config,
190 ) -> anyhow::Result<Self> {
191 let combined_database = CombinedDatabase::new(
192 database,
193 Default::default(),
194 Default::default(),
195 Default::default(),
196 Default::default(),
197 );
198 Self::from_combined_database(combined_database, config).await
199 }
200
201 pub async fn from_combined_database(
203 combined_database: CombinedDatabase,
204 config: Config,
205 ) -> anyhow::Result<Self> {
206 let mut listener = crate::ShutdownListener::spawn();
207 let service = Self::new(combined_database, config, &mut listener)?;
208 let state = service.start_and_await().await?;
209
210 if !state.started() {
211 return Err(anyhow::anyhow!(
212 "The state of the service is not started: {state:?}"
213 ));
214 }
215 Ok(service)
216 }
217
218 #[cfg(feature = "relayer")]
219 pub async fn await_relayer_synced(&self) -> anyhow::Result<()> {
231 if let Some(relayer_handle) = &self.runner.shared.relayer {
232 relayer_handle.await_synced().await?;
233 }
234 Ok(())
235 }
236
237 pub async fn await_compression_synced_until(
240 &self,
241 block_height: &BlockHeight,
242 ) -> anyhow::Result<()> {
243 if let Some(sync_observer) = &self.runner.shared.compression {
244 sync_observer.await_synced_until(block_height).await?;
245 }
246 Ok(())
247 }
248
249 pub async fn await_gas_price_synced(&self) -> anyhow::Result<()> {
252 let _ = &self.runner.shared.gas_price_service.await_synced().await?;
253 Ok(())
254 }
255
256 fn make_database_compatible_with_config<Shutdown>(
257 combined_database: &mut CombinedDatabase,
258 config: &Config,
259 shutdown_listener: &mut Shutdown,
260 ) -> anyhow::Result<()>
261 where
262 Shutdown: ShutdownListener,
263 {
264 let start_up_consensus_config = &config.snapshot_reader.chain_config().consensus;
265
266 let mut found_override_height = None;
267 match start_up_consensus_config {
268 ConsensusConfig::PoA { .. } => {
269 }
271 ConsensusConfig::PoAV2(poa) => {
272 let on_chain_view = combined_database.on_chain().latest_view()?;
273
274 for override_height in poa.get_all_overrides().keys() {
275 let Some(current_height) = on_chain_view.maybe_latest_height()?
276 else {
277 return Ok(());
279 };
280
281 if override_height > ¤t_height {
282 return Ok(());
283 }
284
285 let block_header = on_chain_view
286 .get_sealed_block_header(override_height)?
287 .ok_or(not_found!("SealedBlockHeader"))?;
288 let header = block_header.entity;
289 let seal = block_header.consensus;
290
291 if let Consensus::PoA(poa_seal) = seal {
292 let block_valid = verify_consensus(
293 start_up_consensus_config,
294 &header,
295 &poa_seal,
296 );
297
298 if !block_valid {
299 found_override_height = Some(override_height);
300 }
301 } else {
302 return Err(anyhow::anyhow!(
303 "The consensus at override height {override_height} is not PoA."
304 ));
305 };
306 }
307 }
308 }
309
310 if let Some(override_height) = found_override_height {
311 let rollback_height = override_height.pred().ok_or(anyhow::anyhow!(
312 "The override height is zero. \
313 The override height should be greater than zero."
314 ))?;
315 tracing::warn!(
316 "The consensus at override height {override_height} \
317 does not match with the database. \
318 Rollbacking the database to the height {rollback_height}"
319 );
320 combined_database.rollback_to(rollback_height, shutdown_listener)?;
321 }
322
323 Ok(())
324 }
325
326 fn override_chain_config_if_needed(&self) -> anyhow::Result<()> {
327 let chain_config = self.shared.config.snapshot_reader.chain_config();
328 let on_chain_view = self.shared.database.on_chain().latest_view()?;
329 let chain_config_hash = chain_config.root()?.into();
330 let mut initialized_genesis = on_chain_view.get_genesis()?;
331 let genesis_chain_config_hash = initialized_genesis.chain_config_hash;
332
333 if genesis_chain_config_hash != chain_config_hash {
334 tracing::warn!(
335 "The genesis chain config hash({genesis_chain_config_hash}) \
336 is different from the current one({chain_config_hash}). \
337 Updating the genesis consensus parameters."
338 );
339
340 let genesis_block_height =
341 on_chain_view.genesis_height()?.ok_or(anyhow::anyhow!(
342 "The genesis block height is not found in the database \
343 during overriding the chain config hash."
344 ))?;
345 let mut database_tx = on_chain_view.read_transaction();
346
347 initialized_genesis.chain_config_hash = chain_config_hash;
348 database_tx
349 .storage_as_mut::<SealedBlockConsensus>()
350 .insert(
351 &genesis_block_height,
352 &Consensus::Genesis(initialized_genesis),
353 )?;
354
355 self.shared.database.on_chain().data.commit_changes(
356 Some(genesis_block_height),
357 StorageChanges::Changes(database_tx.into_changes()),
358 )?;
359 }
360
361 Ok(())
362 }
363
364 async fn prepare_genesis(&self, watcher: &StateWatcher) -> anyhow::Result<()> {
365 if let Err(err) = self.shared.database.on_chain().latest_view()?.get_genesis()
367 && err.is_not_found()
368 {
369 let result = genesis::execute_genesis_block(
370 watcher.clone(),
371 &self.shared.config,
372 &self.shared.database,
373 )
374 .await?;
375
376 self.shared.block_importer.commit_result(result).await?;
377 }
378
379 genesis::recover_missing_tables_from_genesis_state_config(
381 watcher.clone(),
382 &self.shared.config,
383 &self.shared.database,
384 )
385 .await?;
386
387 self.override_chain_config_if_needed()
388 }
389}
390
391impl FuelService {
392 pub async fn start_and_await(&self) -> anyhow::Result<State> {
394 let watcher = self.runner.state_watcher();
395 self.prepare_genesis(&watcher).await?;
396 self.runner.start_and_await().await
397 }
398
399 pub fn send_stop_signal(&self) -> bool {
401 self.runner.stop()
402 }
403
404 pub async fn await_shutdown(&self) -> anyhow::Result<State> {
406 self.runner.await_stop().await
407 }
408
409 pub async fn send_stop_signal_and_await_shutdown(&self) -> anyhow::Result<State> {
411 self.runner.stop_and_await().await
412 }
413
414 pub fn state(&self) -> State {
415 self.runner.state()
416 }
417
418 pub fn sub_services(&self) -> &SubServices {
419 self.sub_services.as_ref()
420 }
421}
422
423pub type SubServices = Vec<Box<dyn ServiceTrait + Send + Sync + 'static>>;
424
425struct Task {
426 services: Arc<SubServices>,
428 database: CombinedDatabase,
430 pub shared: SharedState,
432}
433
434impl Task {
435 pub fn new(
437 services: Arc<SubServices>,
438 database: CombinedDatabase,
439 shared: SharedState,
440 ) -> anyhow::Result<Task> {
441 Ok(Task {
442 services,
443 database,
444 shared,
445 })
446 }
447}
448
449#[derive(Default)]
450struct TaskParams {
451 block_production_ready_signal: ReadySignal,
452}
453
454#[async_trait::async_trait]
455impl RunnableService for Task {
456 const NAME: &'static str = "FuelService";
457 type SharedData = SharedState;
458 type Task = Task;
459 type TaskParams = TaskParams;
460
461 fn shared_data(&self) -> Self::SharedData {
462 self.shared.clone()
463 }
464
465 async fn into_task(
466 mut self,
467 watcher: &StateWatcher,
468 params: Self::TaskParams,
469 ) -> anyhow::Result<Self::Task> {
470 let mut watcher = watcher.clone();
471
472 for service in self.services.iter() {
473 tokio::select! {
474 _ = watcher.wait_stopping_or_stopped() => {
475 break;
476 }
477 result = service.start_and_await() => {
478 result?;
479 }
480 }
481 }
482
483 params.block_production_ready_signal.send_ready_signal();
484
485 Ok(self)
486 }
487}
488
489impl RunnableTask for Task {
490 #[tracing::instrument(skip_all)]
491 async fn run(&mut self, watcher: &mut StateWatcher) -> TaskNextAction {
492 let mut stop_signals = vec![];
493 for service in self.services.iter() {
494 stop_signals.push(service.await_stop())
495 }
496 stop_signals.push(Box::pin(watcher.while_started()));
497
498 let (result, _, _) = futures::future::select_all(stop_signals).await;
499
500 if let Err(err) = result {
501 tracing::error!("Got an error during listen for shutdown: {}", err);
502 }
503
504 TaskNextAction::Stop
507 }
508
509 async fn shutdown(self) -> anyhow::Result<()> {
510 for service in self.services.iter() {
511 let result = service.stop_and_await().await;
512
513 if let Err(err) = result {
514 tracing::error!(
515 "Got and error during awaiting for stop of the service: {}",
516 err
517 );
518 }
519 }
520 self.database.shutdown();
521 Ok(())
522 }
523}
524
525#[allow(non_snake_case)]
526#[cfg(test)]
527mod tests {
528 use crate::{
529 ShutdownListener,
530 service::{
531 Config,
532 FuelService,
533 },
534 };
535 use fuel_core_services::State;
536 use std::{
537 thread::sleep,
538 time::Duration,
539 };
540
541 #[tokio::test]
542 async fn stop_sub_service_shutdown_all_services() {
543 let mut i = 0;
545 loop {
546 let mut shutdown = ShutdownListener::spawn();
547 let service =
548 FuelService::new(Default::default(), Config::local_node(), &mut shutdown)
549 .unwrap();
550 service.start_and_await().await.unwrap();
551 sleep(Duration::from_secs(1));
552 for service in service.sub_services() {
553 assert_eq!(service.state(), State::Started);
554 }
555
556 if i < service.sub_services().len() {
557 service.sub_services()[i].stop_and_await().await.unwrap();
558 tokio::time::timeout(Duration::from_secs(5), service.await_shutdown())
559 .await
560 .expect("Failed to stop the service in reasonable period of time")
561 .expect("Failed to stop the service");
562 } else {
563 break;
564 }
565 i += 1;
566 }
567
568 #[allow(unused_mut)]
577 let mut expected_services = 7;
578
579 #[cfg(feature = "p2p")]
585 {
586 expected_services += 3;
588 }
589 #[cfg(feature = "shared-sequencer")]
590 {
591 expected_services += 1;
592 }
593
594 assert_eq!(i, expected_services);
596 }
597
598 #[tokio::test]
599 async fn stop_and_await___stops_all_services() {
600 let mut shutdown = ShutdownListener::spawn();
601 let service =
602 FuelService::new(Default::default(), Config::local_node(), &mut shutdown)
603 .unwrap();
604 service.start_and_await().await.unwrap();
605 let sub_services_watchers: Vec<_> = service
606 .sub_services()
607 .iter()
608 .map(|s| s.state_watcher())
609 .collect();
610
611 sleep(Duration::from_secs(1));
612 for service in service.sub_services() {
613 assert_eq!(service.state(), State::Started);
614 }
615 service.send_stop_signal_and_await_shutdown().await.unwrap();
616
617 for mut service in sub_services_watchers {
618 assert_eq!(service.borrow_and_update().clone(), State::Stopped);
620 }
621 }
622}