use ibc_relayer::supervisor::SupervisorOptions;
use ibc_relayer::util::debug_section::DebugSection;
use std::error::Error;
use std::io;
use abscissa_core::clap::Parser;
use crossbeam_channel::Sender;
use ibc_relayer::chain::handle::{CachingChainHandle, ChainHandle};
use ibc_relayer::config::Config;
use ibc_relayer::registry::SharedRegistry;
use ibc_relayer::rest;
use ibc_relayer::supervisor::{cmd::SupervisorCmd, spawn_supervisor, SupervisorHandle};
use crate::conclude::json;
use crate::conclude::Output;
use crate::prelude::*;
#[derive(Clone, Command, Debug, Parser, PartialEq, Eq)]
pub struct StartCmd {
#[clap(
long = "full-scan",
help = "Force a full scan of the chains for clients, connections and channels"
)]
full_scan: bool,
}
impl Runnable for StartCmd {
fn run(&self) {
let app = app_reader();
if app.debug_enabled(DebugSection::ProfilingJson) {
use std::env;
use std::path::Path;
use ibc_relayer::util::profiling::open_or_create_profile_file;
let profile_dir = env::var("PROFILING_DIR").unwrap_or_else(|_| ".".to_string());
let now = time::OffsetDateTime::now_utc();
let path_str = format!(
"{}/hermes-{:04}-{:02}-{:02}-{:02}{:02}{:02}-prof.json",
profile_dir,
now.year(),
now.month(),
now.day(),
now.hour(),
now.minute(),
now.second()
);
open_or_create_profile_file(Path::new(&path_str));
}
let config = (*app_config()).clone();
let options = SupervisorOptions {
force_full_scan: self.full_scan,
health_check: true,
};
let supervisor_handle = make_supervisor::<CachingChainHandle>(config, options)
.unwrap_or_else(|e| {
Output::error(format!("Hermes failed to start, last error: {e}")).exit()
});
match crate::config::config_path() {
Some(_) => {
register_signals(supervisor_handle.sender.clone()).unwrap_or_else(|e| {
warn!("failed to install signal handler: {}", e);
});
}
None => {
warn!("cannot figure out configuration path, skipping registration of signal handlers");
}
};
info!("Hermes has started");
supervisor_handle.wait();
}
}
fn register_signals(tx_cmd: Sender<SupervisorCmd>) -> Result<(), io::Error> {
use signal_hook::{consts::signal::*, iterator::Signals};
let sigs = vec![
SIGHUP, SIGUSR1, ];
let mut signals = Signals::new(sigs)?;
std::thread::spawn(move || {
for signal in &mut signals {
match signal {
SIGHUP => warn!(
"configuration reloading via SIGHUP has been disabled, \
the signal handler will be removed in the future"
),
SIGUSR1 => {
info!("dumping state (triggered by SIGUSR1)");
let (tx, rx) = crossbeam_channel::bounded(1);
tx_cmd.try_send(SupervisorCmd::DumpState(tx)).unwrap();
std::thread::spawn(move || {
if let Ok(state) = rx.recv() {
if json() {
match serde_json::to_string(&state) {
Ok(out) => println!("{out}"),
Err(e) => {
error!("failed to serialize relayer state to JSON: {}", e)
}
}
} else {
state.print_info();
}
}
});
}
_ => (),
}
}
});
Ok(())
}
fn spawn_rest_server(config: &Config) -> Option<rest::Receiver> {
use ibc_relayer::util::spawn_blocking;
let _span = tracing::error_span!("rest").entered();
let rest = config.rest.clone();
if !rest.enabled {
info!("REST server disabled");
return None;
}
let (tx, rx) = crossbeam_channel::unbounded();
spawn_blocking(async move {
let result = ibc_relayer_rest::spawn((rest.host.as_str(), rest.port), tx);
match result {
Ok(handle) => {
info!(
"REST service running, exposing REST API at http://{}:{}",
rest.host, rest.port
);
if let Err(e) = handle.await {
error!("REST service crashed with error: {e}");
}
}
Err(e) => {
error!("REST service failed to start: {e}");
}
}
});
Some(rx)
}
fn spawn_telemetry_server(config: &Config) {
use ibc_relayer::util::spawn_blocking;
let _span = tracing::error_span!("telemetry").entered();
let state = ibc_telemetry::init(
config.telemetry.buckets.latency_submitted.range.clone(),
config.telemetry.buckets.latency_submitted.buckets,
config.telemetry.buckets.latency_confirmed.range.clone(),
config.telemetry.buckets.latency_confirmed.buckets,
config.telemetry.prefix.as_str(),
);
let telemetry = config.telemetry.clone();
if !telemetry.enabled {
info!("telemetry disabled");
return;
}
spawn_blocking(async move {
let result = ibc_telemetry::spawn((telemetry.host, telemetry.port), state.clone());
match result {
Ok((addr, handle)) => {
info!("telemetry service running, exposing metrics at http://{addr}/metrics");
if let Err(e) = handle.await {
error!("telemetry service crashed with error: {e}");
}
}
Err(e) => error!("telemetry service failed to start: {e}"),
}
});
}
fn make_supervisor<Chain: ChainHandle>(
config: Config,
options: SupervisorOptions,
) -> Result<SupervisorHandle, Box<dyn Error + Send + Sync>> {
let registry = SharedRegistry::<Chain>::new(config.clone());
spawn_telemetry_server(&config);
let rest_rx = spawn_rest_server(&config);
Ok(spawn_supervisor(config, registry, rest_rx, options)?)
}
#[cfg(test)]
mod tests {
use super::StartCmd;
use abscissa_core::clap::Parser;
#[test]
fn test_start_required_only() {
assert_eq!(
StartCmd { full_scan: false },
StartCmd::parse_from(["test"])
)
}
#[test]
fn test_start_full_scan() {
assert_eq!(
StartCmd { full_scan: true },
StartCmd::parse_from(["test", "--full-scan"])
)
}
}