use mq_bridge_app::{
config::{AppConfig, load_config},
mq_bridge, web_ui,
};
use clap::Parser;
use std::net::SocketAddr;
use std::time::Duration;
use tracing::{info, warn};
use tracing_subscriber::EnvFilter;
use tracing_subscriber::fmt::format::FmtSpan;
use anyhow::Context;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
#[arg(short, long)]
config: Option<String>,
#[arg(short, long)]
init_config: Option<String>,
#[arg(long)]
init_config_str: Option<String>,
#[arg(long)]
config_str: Option<String>,
#[arg(long)]
schema: Option<String>,
}
#[tokio::main(flavor = "multi_thread")]
async fn main() -> anyhow::Result<()> {
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
let args = Args::parse();
if let Some(schema_path) = args.schema {
let schema = schemars::schema_for!(AppConfig);
let schema_json =
serde_json::to_string_pretty(&schema).context("Failed to serialize schema")?;
if schema_path == "-" {
println!("{}", schema_json);
} else {
let path = std::path::Path::new(&schema_path);
if let Some(parent) = path.parent()
&& !parent.as_os_str().is_empty()
&& !parent.exists()
{
std::fs::create_dir_all(parent)
.context("Failed to create parent directory for schema")?;
}
std::fs::write(path, schema_json).context("Failed to write schema file")?;
}
return Ok(());
}
let (mut config, config_file_path): (AppConfig, String) = load_config(
args.config,
args.init_config,
args.init_config_str,
args.config_str,
)
.context("Failed to load configuration")?;
init_logging(&config);
println!(
r#"
┌────── mq-bridge-app ──────┐
──────┴───────────────────────────┴──────"#
);
let has_persisted_config = std::path::Path::new(&config_file_path).exists();
if !has_persisted_config || config.consumers.is_empty() {
if config.metrics_addr.is_empty() {
config.metrics_addr = "0.0.0.0:9090".to_string();
}
if config.ui_addr.is_empty() {
config.ui_addr = "0.0.0.0:9091".to_string();
}
}
let mut prom_addr = None;
let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
let (recorder, metrics_task) =
if !config.metrics_addr.is_empty() && config.metrics_addr != config.ui_addr {
let addr: SocketAddr = config.metrics_addr.parse().context(format!(
"Failed to parse metrics listen address: {}",
config.metrics_addr
))?;
let (recorder, server_future) = builder.with_http_listener(addr).build()?;
prom_addr = Some(addr);
(recorder, Some(tokio::spawn(server_future)))
} else {
(builder.build_recorder(), None)
};
let prometheus_handle = recorder.handle();
metrics::set_global_recorder(recorder).context("Failed to install Prometheus recorder")?;
metrics::describe_gauge!(
"mq_bridge_app_info",
"Information about the mq-bridge-app application"
);
metrics::gauge!("mq_bridge_app_info", "version" => env!("CARGO_PKG_VERSION")).set(1.0);
let web_ui_handle = if !config.ui_addr.is_empty() {
let addr = &config.ui_addr;
let socket_addr: SocketAddr = addr
.parse()
.with_context(|| format!("Failed to parse UI listen address: {}", addr))?;
let port = socket_addr.port();
let host = if socket_addr.ip().is_unspecified() {
"localhost".to_string()
} else {
socket_addr.ip().to_string()
};
println!(
r#" Web UI: http://{}:{}
"#,
host, port
);
info!(
"Prometheus metrics enabled on Web UI (http://{}/metrics)",
config.ui_addr
);
let web_ui_server = web_ui::start_web_server(
addr.into(),
config.clone(),
prometheus_handle,
config_file_path,
);
Some(tokio::spawn(web_ui_server))
} else {
println!(
r#" Starting without UI server
"#
);
None
};
if let Some(addr) = prom_addr {
info!("Prometheus exporter listening on http://{}", addr);
}
if config.consumers.is_empty() {
warn!("No consumers configured. Waiting for configuration via Web UI.");
}
info!("Bridge running. Waiting for signal.");
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Ctrl+C (SIGINT) received.");
},
_ = platform_specific_shutdown() => {
info!("Shutdown signal received.");
},
}
info!("Shutdown signal received. Broadcasting to all tasks...");
let shutdown_task = async {
let routes = mq_bridge::list_routes();
if !routes.is_empty() {
info!("Attempting to gracefully stop {} routes...", routes.len());
for name in routes {
mq_bridge::stop_route(&name).await;
}
}
};
if tokio::time::timeout(Duration::from_secs(10), shutdown_task)
.await
.is_err()
{
warn!("Graceful shutdown timed out after 10 seconds. Forcing shutdown.");
} else {
info!("All routes stopped gracefully.");
}
if let Some(task) = metrics_task {
task.abort();
}
if let Some(handle) = web_ui_handle {
handle.abort();
}
info!("Shutdown complete.");
Ok(())
}
fn init_logging(config: &AppConfig) {
if std::env::var("TOKIO_CONSOLE").is_ok() {
warn!("Tokio console subscriber not initialized. Cannot run `tokio-console` to connect.");
return;
}
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(config.log_level.clone()));
let logger = tracing_subscriber::fmt()
.with_env_filter(env_filter)
.with_span_events(FmtSpan::CLOSE) .with_target(true);
match config.logger.as_str() {
"json" => {
logger.json().init();
}
"plain" => {
logger.init();
}
_ => {
logger.init();
}
}
tracing::debug!(
"Logging initialized with level {} and logger {}",
config.log_level,
config.logger
);
}
async fn platform_specific_shutdown() {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
match signal(SignalKind::terminate()) {
Ok(mut stream) => {
use tracing::info;
stream.recv().await;
info!("SIGTERM received.");
}
Err(e) => {
warn!(
"Failed to install SIGTERM handler: {}. This signal will be ignored.",
e
);
std::future::pending::<()>().await;
}
}
}
#[cfg(not(unix))]
std::future::pending::<()>().await
}