use crate::database::Database;
use anyhow::Error as AnyError;
use futures::FutureExt;
use modules::Modules;
use std::{
net::SocketAddr,
panic,
};
use tokio::{
sync::oneshot,
task::JoinHandle,
};
use tracing::log::warn;
pub use config::{
Config,
DbType,
VMConfig,
};
pub mod adapters;
pub mod config;
pub(crate) mod genesis;
pub mod graph_api;
pub mod metrics;
pub mod modules;
pub struct FuelService {
handle: JoinHandle<()>,
shutdown: oneshot::Sender<()>,
#[cfg(feature = "relayer")]
relayer_handle: Option<fuel_relayer::RelayerSynced>,
pub bound_address: SocketAddr,
}
struct FuelServiceInner {
tasks: Vec<JoinHandle<Result<(), AnyError>>>,
modules: Modules,
pub bound_address: SocketAddr,
stop_graphql_api: oneshot::Sender<()>,
}
impl FuelService {
#[tracing::instrument(skip(config))]
pub async fn new_node(mut config: Config) -> Result<Self, AnyError> {
Self::make_config_consistent(&mut config);
let database = match config.database_type {
#[cfg(feature = "rocksdb")]
DbType::RocksDb => Database::open(&config.database_path)?,
DbType::InMemory => Database::in_memory(),
#[cfg(not(feature = "rocksdb"))]
_ => Database::in_memory(),
};
Ok(Self::spawn_service(
Self::init_service(database, config).await?,
))
}
fn spawn_service(service: FuelServiceInner) -> Self {
let bound_address = service.bound_address;
let (shutdown, stop_rx) = oneshot::channel();
#[cfg(feature = "relayer")]
let relayer_handle = service
.modules
.relayer
.as_ref()
.map(fuel_relayer::RelayerHandle::listen_synced);
let handle = tokio::spawn(async move {
let run_fut = service.run();
let shutdown_fut = stop_rx.then(|stop| async move {
if stop.is_err() {
futures::future::pending::<()>().await;
}
});
tokio::pin!(run_fut);
tokio::pin!(shutdown_fut);
futures::future::select(shutdown_fut, run_fut).await;
});
Self {
handle,
shutdown,
bound_address,
#[cfg(feature = "relayer")]
relayer_handle,
}
}
fn make_config_consistent(config: &mut Config) {
if config.txpool.chain_config != config.chain_conf {
warn!("The `ChainConfig` of `TxPool` was inconsistent");
config.txpool.chain_config = config.chain_conf.clone();
}
if config.txpool.utxo_validation != config.utxo_validation {
warn!("The `utxo_validation` of `TxPool` was inconsistent");
config.txpool.utxo_validation = config.utxo_validation;
}
if config.block_producer.utxo_validation != config.utxo_validation {
warn!("The `utxo_validation` of `BlockProducer` was inconsistent");
config.block_producer.utxo_validation = config.utxo_validation;
}
}
pub async fn from_database(
database: Database,
config: Config,
) -> Result<Self, AnyError> {
Ok(Self::spawn_service(
Self::init_service(database, config).await?,
))
}
async fn init_service(
database: Database,
config: Config,
) -> Result<FuelServiceInner, AnyError> {
Self::initialize_state(&config, &database)?;
let modules = modules::start_modules(&config, &database).await?;
let (stop_tx, stop_rx) = oneshot::channel();
let mut tasks = vec![];
let (bound_address, api_server) =
graph_api::start_server(config.clone(), database, &modules, stop_rx).await?;
tasks.push(api_server);
Ok(FuelServiceInner {
tasks,
bound_address,
modules,
stop_graphql_api: stop_tx,
})
}
pub async fn run(self) {
Self::wait_for_handle(self.handle).await;
}
pub async fn stop(self) {
let Self {
handle, shutdown, ..
} = self;
let _ = shutdown.send(());
Self::wait_for_handle(handle).await;
}
async fn wait_for_handle(handle: JoinHandle<()>) {
if let Err(err) = handle.await {
if err.is_panic() {
panic::resume_unwind(err.into_panic());
}
}
}
#[cfg(feature = "relayer")]
pub async fn await_relayer_synced(&self) -> anyhow::Result<()> {
if let Some(relayer_handle) = &self.relayer_handle {
relayer_handle.await_synced().await?;
}
Ok(())
}
}
impl FuelServiceInner {
pub async fn run(self) {
let Self {
tasks,
modules,
stop_graphql_api,
..
} = self;
let run_fut = Self::run_inner(tasks);
let shutdown_fut = shutdown_signal(stop_graphql_api);
tokio::pin!(run_fut);
tokio::pin!(shutdown_fut);
futures::future::select(shutdown_fut, run_fut).await;
modules.stop().await;
}
async fn run_inner(tasks: Vec<JoinHandle<anyhow::Result<()>>>) {
for task in tasks {
match task.await {
Err(err) => {
if err.is_panic() {
panic::resume_unwind(err.into_panic());
}
}
Ok(Err(e)) => {
eprintln!("server error: {:?}", e);
}
Ok(Ok(_)) => {}
}
}
}
}
async fn shutdown_signal(stop_graphql_api: oneshot::Sender<()>) {
#[cfg(unix)]
{
let mut sigterm =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install sigterm handler");
let mut sigint =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.expect("failed to install sigint handler");
loop {
tokio::select! {
_ = sigterm.recv() => {
tracing::info!("sigterm received");
let _ = stop_graphql_api.send(());
break;
}
_ = sigint.recv() => {
tracing::log::info!("sigint received");
let _ = stop_graphql_api.send(());
break;
}
}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
let _ = stop_graphql_api.send(());
info!("CTRL+C received");
}
}