<file>turbopack/crates/turbopack-trace-utils/src/exit.rs</file>
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex, OnceLock},
};
use anyhow::Result;
use tokio::{select, sync::mpsc, task::JoinSet};
/// A guard for the exit handler. When dropped, the exit guard will be dropped.
/// It might also be dropped on Ctrl-C.
pub struct ExitGuard<T>(Arc<Mutex<Option<T>>>);
impl<T> Drop for ExitGuard<T> {
fn drop(&mut self) {
drop(self.0.lock().unwrap().take())
}
}
impl<T: Send + 'static> ExitGuard<T> {
/// Drop a guard when Ctrl-C is pressed or the [ExitGuard] is dropped.
pub fn new(guard: T) -> Result<Self> {
let guard = Arc::new(Mutex::new(Some(guard)));
{
let guard = guard.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c().await.unwrap();
drop(guard.lock().unwrap().take());
std::process::exit(0);
});
}
Ok(ExitGuard(guard))
}
}
type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
/// The singular global ExitHandler. This is primarily used to ensure
/// `ExitHandler::listen` is only called once.
///
/// The global handler is intentionally not exposed, so that APIs that depend on
/// exit behavior are required to take the `ExitHandler`. This ensures that the
/// `ExitHandler` is configured before these APIs are run, and that these
/// consumers can be used with a callback (e.g. a mock) instead.
static GLOBAL_EXIT_HANDLER: OnceLock<Arc<ExitHandler>> = OnceLock::new();
pub struct ExitHandler {
tx: mpsc::UnboundedSender<BoxExitFuture>,
}
impl ExitHandler {
/// Waits for `SIGINT` using [`tokio::signal::ctrl_c`], and exits the
/// process with exit code `0` after running any futures scheduled with
/// [`ExitHandler::on_exit`].
///
/// As this uses global process signals, this must only be called once, and
/// will panic if called multiple times. Use this when you own the
/// process (e.g. `turbopack-cli`).
///
/// If you don't own the process (e.g. you're called as a library, such as
/// in `next-swc`), use [`ExitHandler::new_trigger`] instead.
///
/// This may listen for other signals, like `SIGTERM` or `SIGPIPE` in the
/// future.
pub fn listen() -> &'static Arc<ExitHandler> {
let (handler, receiver) = Self::new_receiver();
if GLOBAL_EXIT_HANDLER.set(handler).is_err() {
panic!("ExitHandler::listen must only be called once");
}
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("failed to set ctrl_c handler");
receiver.run_exit_handler().await;
std::process::exit(0);
});
GLOBAL_EXIT_HANDLER.get().expect("value is set")
}
/// Creates an [`ExitHandler`] that can be manually controlled with an
/// [`ExitReceiver`].
///
/// This does not actually exit the process or listen for any signals. If
/// you'd like that behavior, use [`ExitHandler::listen`].
///
/// Because this API has no global side-effects and can be called many times
/// within the same process, it is possible to use it to provide a mock
/// [`ExitHandler`] inside unit tests.
pub fn new_receiver() -> (Arc<ExitHandler>, ExitReceiver) {
let (tx, rx) = mpsc::unbounded_channel();
(Arc::new(ExitHandler { tx }), ExitReceiver { rx })
}
/// Register this given [`Future`] to run upon process exit.
///
/// As there are many ways for a process be killed that are outside of a
/// process's own control (e.g. `SIGKILL` or `SIGSEGV`), this API is
/// provided on a best-effort basis.
pub fn on_exit(&self, fut: impl Future<Output = ()> + Send + 'static) {
// realistically, this error case can only happen with the `new_receiver` API.
self.tx
.send(Box::pin(fut))
.expect("cannot send future after process exit");
}
}
/// Provides a way to run futures scheduled with an [`ExitHandler`].
pub struct ExitReceiver {
rx: mpsc::UnboundedReceiver<BoxExitFuture>,
}
impl ExitReceiver {
/// Call this when the process exits to run the futures scheduled via
/// [`ExitHandler::on_exit`].
///
/// As this is intended to be used in a library context, this does not exit
/// the process. It is expected that the process will not exit until
/// this async method finishes executing.
///
/// Additional work can be scheduled using [`ExitHandler::on_exit`] even
/// while this is running, and it will execute before this function
/// finishes. Work attempted to be scheduled after this finishes will panic.
pub async fn run_exit_handler(mut self) {
let mut set = JoinSet::new();
while let Ok(fut) = self.rx.try_recv() {
set.spawn(fut);
}
loop {
select! {
biased;
Some(fut) = self.rx.recv() => {
set.spawn(fut);
},
val = set.join_next() => {
match val {
Some(Ok(())) => {}
Some(Err(_)) => panic!("ExitHandler future panicked!"),
None => return,
}
},
}
}
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::needless_return)] // tokio macro-generated code doesn't respect this
use std::{
future::Future,
pin::Pin,
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
};
use super::ExitHandler;
#[tokio::test]
async fn test_on_exit() {
let (handler, receiver) = ExitHandler::new_receiver();
let called = Arc::new(AtomicBool::new(false));
handler.on_exit({
let called = Arc::clone(&called);
async move {
tokio::task::yield_now().await;
called.store(true, Ordering::SeqCst);
}
});
receiver.run_exit_handler().await;
assert!(called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn test_queue_while_exiting() {
let (handler, receiver) = ExitHandler::new_receiver();
let call_count = Arc::new(AtomicU32::new(0));
type BoxExitFuture = Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
// this struct is needed to construct the recursive closure type
#[derive(Clone)]
struct GetFut {
handler: Arc<ExitHandler>,
call_count: Arc<AtomicU32>,
}
impl GetFut {
fn get(self) -> BoxExitFuture {
Box::pin(async move {
tokio::task::yield_now().await;
if self.call_count.fetch_add(1, Ordering::SeqCst) < 99 {
// queue more work while the exit handler is running
Arc::clone(&self.handler).on_exit(self.get())
}
})
}
}
handler.on_exit(
GetFut {
handler: Arc::clone(&handler),
call_count: Arc::clone(&call_count),
}
.get(),
);
receiver.run_exit_handler().await;
assert_eq!(call_count.load(Ordering::SeqCst), 100);
}
}
----
<file>common/task/src/signal.rs</file>
use crate::{manager::SentError, TaskManager};
#[cfg(unix)]
pub async fn wait_for_signal() {
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
},
_ = sigterm.recv() => {
log::info!("Received SIGTERM");
}
_ = sigquit.recv() => {
log::info!("Received SIGQUIT");
}
}
}
#[cfg(not(unix))]
pub async fn wait_for_signal() {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
},
}
}
#[cfg(unix)]
pub async fn wait_for_signal_and_error(shutdown: &mut TaskManager) -> Result<(), SentError> {
use tokio::signal::unix::{signal, SignalKind};
let mut sigterm = signal(SignalKind::terminate()).expect("Failed to setup SIGTERM channel");
let mut sigquit = signal(SignalKind::quit()).expect("Failed to setup SIGQUIT channel");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
Ok(())
},
_ = sigterm.recv() => {
log::info!("Received SIGTERM");
Ok(())
}
_ = sigquit.recv() => {
log::info!("Received SIGQUIT");
Ok(())
}
Some(msg) = shutdown.wait_for_error() => {
log::info!("Task error: {:?}", msg);
Err(msg)
}
}
}
#[cfg(not(unix))]
pub async fn wait_for_signal_and_error(shutdown: &mut TaskManager) -> Result<(), SentError> {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
log::info!("Received SIGINT");
Ok(())
},
Some(msg) = shutdown.wait_for_error() => {
log::info!("Task error: {:?}", msg);
Err(msg)
}
}
}
<file>nym-node-status-api/nym-node-status-api/src/main.rs</file>
use clap::Parser;
use nym_crypto::asymmetric::ed25519::PublicKey;
use nym_task::signal::wait_for_signal;
mod cli;
mod db;
mod http;
mod logging;
mod mixnet_scraper;
mod monitor;
mod node_scraper;
mod testruns;
mod utils;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
logging::setup_tracing_logger()?;
let args = cli::Cli::parse();
let agent_key_list = args
.agent_key_list
.iter()
.map(|value| PublicKey::from_base58_string(value.trim()).map_err(anyhow::Error::from))
.collect::<anyhow::Result<Vec<_>>>()?;
tracing::info!("Registered {} agent keys", agent_key_list.len());
let connection_url = args.database_url.clone();
tracing::debug!("Using config:\n{:#?}", args);
let storage = db::Storage::init(connection_url).await?;
let db_pool = storage.pool_owned();
// Start the node scraper
let scraper = mixnet_scraper::Scraper::new(storage.pool_owned());
tokio::spawn(async move {
scraper.start().await;
});
// node geocache is shared between node monitor and HTTP server
let geocache = moka::future::Cache::builder()
.time_to_live(args.geodata_ttl)
.build();
// Start the monitor
let args_clone = args.clone();
let geocache_clone = geocache.clone();
tokio::spawn(async move {
monitor::spawn_in_background(
db_pool,
args_clone.nym_api_client_timeout,
args_clone.nyxd_addr,
args_clone.monitor_refresh_interval,
args_clone.ipinfo_api_token,
geocache_clone,
)
.await;
tracing::info!("Started monitor task");
});
testruns::spawn(storage.pool_owned(), args.testruns_refresh_interval).await;
let db_pool_scraper = storage.pool_owned();
tokio::spawn(async move {
node_scraper::spawn_in_background(db_pool_scraper, args_clone.nym_api_client_timeout).await;
tracing::info!("Started metrics scraper task");
});
let shutdown_handles = http::server::start_http_api(
storage.pool_owned(),
args.http_port,
args.nym_http_cache_ttl,
agent_key_list.to_owned(),
args.max_agent_count,
geocache,
)
.await
.expect("Failed to start server");
tracing::info!("Started HTTP server on port {}", args.http_port);
wait_for_signal().await; // <<<< usage >>>>
if let Err(err) = shutdown_handles.shutdown().await {
tracing::error!("{err}");
};
Ok(())
}
<file>tools/nymvisor/src/tasks/launcher/mod.rs</file>
// Copyright 2023 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: Apache-2.0
use crate::config::Config;
use crate::daemon::Daemon;
use crate::error::NymvisorError;
use crate::tasks::launcher::backup::BackupBuilder;
use crate::upgrades::types::{CurrentVersionInfo, UpgradeInfo};
use crate::upgrades::{perform_upgrade, types::UpgradePlan, UpgradeResult};
use futures::future::{FusedFuture, OptionFuture};
use futures::{FutureExt, StreamExt};
use nym_async_file_watcher::FileWatcherEventReceiver;
use nym_task::signal::wait_for_signal;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::pin;
use tokio::time::sleep;
use tracing::{debug, error, info, warn};
mod backup;
pub(crate) struct DaemonLauncher {
config: Config,
upgrade_plan_watcher: FileWatcherEventReceiver,
}
impl DaemonLauncher {
pub(crate) fn new(config: Config, upgrade_plan_watcher: FileWatcherEventReceiver) -> Self {
DaemonLauncher {
config,
upgrade_plan_watcher,
}
}
pub(crate) async fn run_loop(&mut self, args: Vec<String>) -> Result<(), NymvisorError> {
let mut startup_failures = 0;
loop {
let run_start = tokio::time::Instant::now();
let res = self.run_and_upgrade(args.clone()).await;
let run_duration = run_start.elapsed();
info!(
"the daemon has run for {}",
humantime::format_duration(run_duration)
);
match res {
Ok(upgrade_result) => {
if upgrade_result.requires_manual_intervention {
info!("this upgrade requires manual intervention. Please read the release notes carefully and follow the provided instructions before starting nymvisor again");
return Ok(());
}
if upgrade_result.binary_swapped {
if !self.config.daemon.debug.restart_after_upgrade {
info!("upgrade detected, DAEMON_RESTART_AFTER_UPGRADE is off. Verify new upgrade and start nymvisor again");
return Ok(());
}
// else - binary has been swapped and restarting is enabled: do restart
} else {
// binary has finished its execution (short-lived process) without upgrades
return Ok(());
}
}
Err(failure) => {
error!("daemon failed with the following error: {failure}");
if !self.config.daemon.debug.restart_on_failure {
return Err(NymvisorError::DisabledRestartOnFailure);
}
if run_duration < self.config.daemon.debug.startup_period_duration {
startup_failures += 1;
} else {
startup_failures = 1;
}
if startup_failures >= self.config.daemon.debug.max_startup_failures {
return Err(NymvisorError::DaemonMaximumStartupFailures {
failures: startup_failures,
});
}
info!(
"waiting for {} before attempting to restart the daemon...",
humantime::format_duration(self.config.daemon.debug.failure_restart_delay)
);
sleep(self.config.daemon.debug.failure_restart_delay).await;
// restart
}
}
info!("the daemon will be now restarted")
}
}
/// the full upgrade process process, i.e. run until upgrade, do backup and perform the upgrade.
/// returns a boolean indicating whether an upgrade has been performed
async fn run_and_upgrade(&mut self, args: Vec<String>) -> Result<UpgradeResult, NymvisorError> {
let upgrade_available = self.wait_for_upgrade_or_termination(args.clone()).await?;
if !upgrade_available {
return Ok(UpgradeResult::new_shortlived());
}
if !self.config.daemon.debug.unsafe_skip_backup {
self.perform_backup()?;
}
perform_upgrade(&self.config).await
// if we ever wanted to introduce any pre-upgrade scripts like cosmovisor, they'd go here
}
/// this function gets called whenever the file watcher detects changes in the upgrade plan file
/// it returns an option indicating when the next upgrade should be performed
fn check_upgrade_plan_changes(&self) -> Option<Duration> {
info!("checking changes in the upgrade plan file...");
let current_upgrade_plan = match UpgradePlan::try_load(self.config.upgrade_plan_filepath())
{
Ok(upgrade_plan) => upgrade_plan,
Err(err) => {
error!("failed to read the current upgrade plan: {err}");
return None;
}
};
if let Some(next) = current_upgrade_plan.next_upgrade() {
let now = OffsetDateTime::now_utc();
Some((next.upgrade_time - now).try_into().unwrap_or_default())
} else {
None
}
}
// responsible for running until exit or until update is detected
async fn wait_for_upgrade_or_termination(
&mut self,
args: Vec<String>,
) -> Result<bool, NymvisorError> {
let daemon = Daemon::from_config(&self.config);
let current_info = UpgradeInfo::try_load(self.config.current_upgrade_info_filepath())?;
let expected_version =
CurrentVersionInfo::try_load(self.config.current_daemon_version_filepath())?;
let daemon_info = daemon.get_build_information()?;
current_info.ensure_matches(&expected_version)?;
if expected_version.binary_details != daemon_info {
return Err(NymvisorError::UnexpectedDaemonBuild {
daemon_info: Box::new(daemon_info),
stored_info: Box::new(expected_version.binary_details),
});
}
let mut running_daemon = daemon.execute_async(args)?;
let interrupt_handle = running_daemon.interrupt_handle();
// we need to fuse the daemon future so that we could check if it has already terminated
let mut fused_runner = running_daemon.fuse();
let mut upgrade_timeout: OptionFuture<_> = self
.check_upgrade_plan_changes()
.map(sleep)
.map(Box::pin)
.map(FutureExt::fuse)
.into();
let signal_fut = wait_for_signal();
pin!(signal_fut);
let mut received_interrupt = false;
loop {
tokio::select! {
daemon_res = &mut fused_runner => {
warn!("the daemon has terminated by itself - was it a short lived command?");
let exit_status = daemon_res?;
info!("it finished with the following exit status: {exit_status}");
return Ok(false)
}
event = self.upgrade_plan_watcher.next() => {
let Some(event) = event else {
// this is a critical failure since the file watcher task should NEVER terminate by itself
error!("CRITICAL FAILURE: the upgrade plan watcher channel got closed");
panic!("CRITICAL FAILURE: the upgrade plan watcher channel got closed")
};
debug!("the file has changed - {event:?}");
if let Some(next_upgrade) = self.check_upgrade_plan_changes() {
info!("setting the upgrade timeout to {}", humantime::format_duration(next_upgrade));
upgrade_timeout = Some(Box::pin(sleep(next_upgrade)).fuse()).into()
}
}
_ = &mut upgrade_timeout, if !upgrade_timeout.is_terminated() => {
info!("the upgrade timeout has elapsed. the daemon will be now stopped in order to perform the upgrade");
break
}
_ = &mut signal_fut => {
received_interrupt = true;
info!("the nymvisor has received an interrupt. the daemon will be now stopped before exiting");
break
}
}
}
if fused_runner.is_terminated() {
return Ok(false);
}
interrupt_handle.interrupt_daemon();
match fused_runner.await {
Ok(exit_status) => {
info!("the daemon finished with the following exit status: {exit_status}");
}
Err(err) => {
warn!("the daemon finished with an error: {err}");
}
}
// if we received an interrupt, don't try to perform upgrade, just exit the nymvisor
Ok(!received_interrupt)
}
fn perform_backup(&self) -> Result<(), NymvisorError> {
let plan = UpgradePlan::try_load(self.config.upgrade_plan_filepath())?;
let Some(upgrade_name) = plan.next_upgrade().map(|u| &u.name) else {
// this should NEVER be possible, but because those famous last words have been said before,
// let's just return an error when it inevitably happens
return Err(NymvisorError::NoQueuedUpgrades);
};
BackupBuilder::new(self.config.daemon_upgrade_backup_dir(upgrade_name))?
.backup_daemon_home(&self.config.daemon.home)
}
}
<file>nyx-chain-watcher/src/cli/commands/run/mod.rs</file>
// Copyright 2024 - Nym Technologies SA <contact@nymtech.net>
// SPDX-License-Identifier: GPL-3.0-only
use crate::error::NyxChainWatcherError;
use anyhow::Context;
use std::time::Duration;
use time::OffsetDateTime;
use tokio::task::{JoinHandle, JoinSet};
use tokio_util::sync::CancellationToken;
use tracing::{error, info};
mod args;
mod config;
use crate::chain_scraper::run_chain_scraper;
use crate::db::DbPool;
use crate::http::state::{BankScraperModuleState, PaymentListenerState, PriceScraperState};
use crate::payment_listener::PaymentListener;
use crate::price_scraper::PriceScraper;
use crate::{db, http};
pub(crate) use args::Args;
use nym_task::signal::wait_for_signal;
async fn try_insert_watcher_execution_information(
db_pool: DbPool,
start: OffsetDateTime,
end: OffsetDateTime,
error_message: Option<String>,
) {
let _ = sqlx::query!(
r#"
INSERT INTO watcher_execution(start, end, error_message)
VALUES (?, ?, ?)
"#,
start,
end,
error_message
)
.execute(&db_pool)
.await
.inspect_err(|err| error!("failed to insert run information: {err}"));
}
async fn wait_for_shutdown(
db_pool: DbPool,
start: OffsetDateTime,
main_cancellation_token: CancellationToken,
scraper_cancellation_token: CancellationToken,
mut tasks: JoinSet<Option<anyhow::Result<()>>>,
) {
async fn finalize_shutdown(
db_pool: DbPool,
start: OffsetDateTime,
main_cancellation_token: CancellationToken,
scraper_cancellation_token: CancellationToken,
mut tasks: JoinSet<Option<anyhow::Result<()>>>,
error_message: Option<String>,
) {
// cancel all tasks
main_cancellation_token.cancel();
scraper_cancellation_token.cancel();
// stupid nasty and hacky workaround to make sure all relevant tasks have finished before hard aborting them
// nasty stupid and hacky workaround
tokio::time::sleep(Duration::from_secs(1)).await;
tasks.abort_all();
// insert execution result into the db
try_insert_watcher_execution_information(
db_pool,
start,
OffsetDateTime::now_utc(),
error_message,
)
.await
}
tokio::select! {
// graceful shutdown
_ = wait_for_signal() => {
info!("received shutdown signal");
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, None).await;
}
_ = scraper_cancellation_token.cancelled() => {
info!("the scraper has issued cancellation");
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, Some("unexpected scraper task cancellation".into())).await;
}
_ = main_cancellation_token.cancelled() => {
info!("one of the tasks has cancelled the token");
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, Some("unexpected main task cancellation".into())).await;
}
task_result = tasks.join_next() => {
// the first unwrap is fine => join set was not empty
let error_message = match task_result.unwrap() {
Err(_join_err) => Some("unexpected join error".to_string()),
Ok(Some(Ok(_))) => None,
Ok(Some(Err(err))) => Some(err.to_string()),
Ok(None) => {
Some("unexpected task cancellation".to_string())
}
};
error!("unexpected task termination: {error_message:?}");
finalize_shutdown(db_pool, start, main_cancellation_token, scraper_cancellation_token, tasks, error_message).await;
}
}
}
pub(crate) async fn execute(args: Args, http_port: u16) -> Result<(), NyxChainWatcherError> {
let start = OffsetDateTime::now_utc();
info!("passed arguments: {args:#?}");
let config = config::get_run_config(args)?;
let db_path = config.database_path();
info!("Config is {config:#?}");
info!(
"Database path is {:?}",
std::path::Path::new(&db_path)
.canonicalize()
.unwrap_or_default()
);
info!(
"Chain History Database path is {:?}",
std::path::Path::new(&config.chain_scraper_database_path()).canonicalize()
);
// Ensure parent directory exists
if let Some(parent) = std::path::Path::new(&db_path).parent() {
std::fs::create_dir_all(parent)?;
}
let connection_url = format!("sqlite://{}?mode=rwc", db_path);
let storage = db::Storage::init(connection_url).await?;
let watcher_pool = storage.pool_owned();
let mut tasks = JoinSet::new();
let cancellation_token = CancellationToken::new();
let price_scraper_pool = storage.pool_owned();
let scraper_pool = storage.pool_owned();
let shutdown_pool = storage.pool_owned();
// construct shared state
let payment_listener_shared_state = PaymentListenerState::new();
let price_scraper_shared_state = PriceScraperState::new();
let bank_scraper_module_shared_state = BankScraperModuleState::new();
// spawn all the tasks
// 1. chain scraper (note: this doesn't really spawn the full scraper on this task, but we don't want to be blocking waiting for its startup)
let scraper_token_handle: JoinHandle<anyhow::Result<CancellationToken>> = tokio::spawn({
let config = config.clone();
let shared_state = bank_scraper_module_shared_state.clone();
async move {
// this only blocks until startup sync is done; it then runs on its own set of tasks
let scraper = run_chain_scraper(&config, scraper_pool, shared_state).await?;
Ok(scraper.cancel_token())
}
});
// 2. payment listener
let token = cancellation_token.clone();
let payment_watcher_config = config.payment_watcher_config.clone();
let payment_listener = PaymentListener::new(
price_scraper_pool,
payment_watcher_config,
payment_listener_shared_state.clone(),
)?;
{
tasks.spawn(async move {
token
.run_until_cancelled(async move {
payment_listener.run().await;
Ok(())
})
.await
});
}
// 3. price scraper (note, this task never terminates on its own)
let price_scraper = PriceScraper::new(price_scraper_shared_state.clone(), watcher_pool);
{
let token = cancellation_token.clone();
tasks.spawn(async move {
token
.run_until_cancelled(async move {
price_scraper.run().await;
Ok(())
})
.await
});
}
// 4. http api
let http_server = http::server::build_http_api(
storage.pool_owned(),
&config,
http_port,
payment_listener_shared_state,
price_scraper_shared_state,
bank_scraper_module_shared_state,
)
.await?;
{
let token = cancellation_token.clone();
tasks.spawn(async move {
info!("Starting HTTP server on port {http_port}",);
async move {
Some(
http_server
.run(token.cancelled_owned())
.await
.context("http server failure"),
)
}
.await
});
}
// 1. wait for either shutdown or scraper having finished startup
tokio::select! {
_ = wait_for_signal() => {
info!("received shutdown signal while waiting for scraper to finish its startup");
return Ok(())
}
scraper_token = scraper_token_handle => {
let scraper_token = match scraper_token {
Ok(Ok(token)) => token,
Ok(Err(startup_err)) => {
error!("failed to startup the chain scraper: {startup_err}");
return Err(startup_err.into());
}
Err(runtime_err) => {
error!("failed to finish the scraper startup task: {runtime_err}");
return Ok(())
}
};
wait_for_shutdown(shutdown_pool, start, cancellation_token, scraper_token, tasks).await
}
}
Ok(())
}