use clap::Parser;
use mq_bridge_app::config::{load_config, AppConfig};
use mq_bridge_app::web_ui;
use std::net::SocketAddr;
use std::time::Duration;
use tracing::{info, warn};
use tracing_subscriber::fmt::format::FmtSpan;
use tracing_subscriber::EnvFilter;
use anyhow::Context;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(short, long)]
config: Option<String>,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let (mut config, config_file_path): (AppConfig, String) =
load_config(args.config).context("Failed to load configuration")?;
init_logging(&config);
if config.routes.is_empty() {
if config.ui_addr.is_empty() {
config.ui_addr = "0.0.0.0:9090".to_string();
}
if config.metrics_addr.is_empty() {
config.metrics_addr = "0.0.0.0:9090".to_string();
}
}
let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
let (recorder, metrics_task) =
if !config.metrics_addr.is_empty() && config.metrics_addr != config.ui_addr {
let addr: SocketAddr = config.metrics_addr.parse().context(format!(
"Failed to parse metrics listen address: {}",
config.metrics_addr
))?;
let (recorder, server_future) = builder.with_http_listener(addr).build()?;
info!("Prometheus exporter listening on http://{}", addr);
(recorder, Some(tokio::spawn(server_future)))
} else {
(builder.build_recorder(), None)
};
let prometheus_handle = recorder.handle();
metrics::set_global_recorder(recorder).context("Failed to install Prometheus recorder")?;
metrics::describe_gauge!(
"mq_bridge_app_info",
"Information about the mq-bridge-app application"
);
metrics::gauge!("mq_bridge_app_info", "version" => env!("CARGO_PKG_VERSION")).set(1.0);
if !config.ui_addr.is_empty() {
info!(
"Prometheus metrics enabled on Web UI (http://{}/metrics)",
config.ui_addr
);
}
println!(
r#"
┌────── mq-bridge-app ──────┐
──────┴───────────────────────────┴──────"#
);
let web_ui_handle = if !config.ui_addr.is_empty() {
let addr = &config.ui_addr;
let socket_addr: SocketAddr = addr
.parse()
.with_context(|| format!("Failed to parse UI listen address: {}", addr))?;
let port = socket_addr.port();
let host = if socket_addr.ip().is_unspecified() {
"localhost".to_string()
} else {
socket_addr.ip().to_string()
};
println!(
r#" Web UI: http://{}:{}
"#,
host, port
);
let web_ui_server = web_ui::start_web_server(
addr.into(),
config.clone(),
prometheus_handle,
config_file_path,
);
Some(tokio::spawn(web_ui_server))
} else {
println!(
r#" Starting without UI server
"#
);
None
};
if config.routes.is_empty() {
warn!("No routes configured. Waiting for configuration via Web UI.");
} else {
let routes = std::mem::take(&mut config.routes);
for (_, route) in &routes {
if route.is_ref() {
route.register_output_endpoint(None)?;
}
}
for (name, route) in routes {
route.deploy(&name).await?;
}
}
info!("Bridge running. Waiting for signal.");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Ctrl+C (SIGINT) received.");
},
_ = platform_specific_shutdown() => {
info!("Shutdown signal received.");
},
}
info!("Shutdown signal received. Broadcasting to all tasks...");
let shutdown_task = async {
let routes = mq_bridge::list_routes();
if !routes.is_empty() {
info!("Attempting to gracefully stop {} routes...", routes.len());
for name in routes {
mq_bridge::stop_route(&name).await;
}
}
};
if tokio::time::timeout(Duration::from_secs(10), shutdown_task)
.await
.is_err()
{
warn!("Graceful shutdown timed out after 10 seconds. Forcing shutdown.");
} else {
info!("All routes stopped gracefully.");
}
if let Some(task) = metrics_task {
task.abort();
}
if let Some(handle) = web_ui_handle {
handle.abort();
}
info!("Shutdown complete.");
Ok(())
}
fn init_logging(config: &AppConfig) {
if std::env::var("TOKIO_CONSOLE").is_ok() {
warn!("Tokio console subscriber not initialized. Cannot run `tokio-console` to connect.");
return;
}
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(config.log_level.clone()));
let logger = tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_span_events(FmtSpan::CLOSE) .with_target(true);
match config.logger.as_str() {
"json" => {
logger.json().init();
}
"plain" => {
logger.init();
}
_ => {
logger.init();
}
}
tracing::debug!(
"Logging initialized with level {} and logger {}",
config.log_level,
config.logger
);
}
async fn platform_specific_shutdown() {
#[cfg(unix)]
{
use tokio::signal::unix::{signal, SignalKind};
match signal(SignalKind::terminate()) {
Ok(mut stream) => {
use tracing::info;
stream.recv().await;
info!("SIGTERM received.");
}
Err(e) => {
warn!(
"Failed to install SIGTERM handler: {}. This signal will be ignored.",
e
);
std::future::pending::<()>().await;
}
}
}
#[cfg(not(unix))]
std::future::pending::<()>().await
}