use clap::Parser;
use fluxmq::{tls::TlsConfig, BrokerConfig, BrokerId, BrokerServer, Result};
use tracing::{error, info, warn};
use tracing_subscriber;
#[derive(Parser, Debug)]
#[command(name = "fluxmq")]
#[command(about = "A Kafka-compatible message broker written in Rust")]
struct Args {
#[arg(long, default_value = "0.0.0.0")]
host: String,
#[arg(short, long, default_value = "9092")]
port: u16,
#[arg(short, long, default_value = "info")]
log_level: String,
#[arg(long, default_value = "0")]
broker_id: BrokerId,
#[arg(long)]
enable_replication: bool,
#[arg(long)]
enable_consumer_groups: bool,
#[arg(long)]
recovery_mode: bool,
#[arg(long, default_value = "./data")]
data_dir: String,
#[arg(long, default_value = "8080")]
metrics_port: Option<u16>,
#[arg(long)]
enable_tls: bool,
#[arg(long)]
tls_cert: Option<String>,
#[arg(long)]
tls_key: Option<String>,
#[arg(long)]
tls_port: Option<u16>,
#[arg(long)]
enable_acl: bool,
#[arg(long)]
acl_config: Option<String>,
#[arg(long)]
acl_allow_everyone: bool,
#[arg(long)]
super_users: Option<String>,
}
#[tokio::main]
async fn main() -> Result<()> {
let args = Args::parse();
tracing_subscriber::fmt()
.with_max_level(parse_log_level(&args.log_level))
.init();
info!("Starting FluxMQ broker on {}:{}", args.host, args.port);
info!("Broker ID: {}", args.broker_id);
info!("Replication enabled: {}", args.enable_replication);
info!("Consumer groups enabled: {}", args.enable_consumer_groups);
info!("Recovery mode: {}", args.recovery_mode);
info!("Data directory: {}", args.data_dir);
info!("TLS enabled: {}", args.enable_tls);
if args.enable_tls {
if let (Some(ref cert_path), Some(ref key_path)) = (&args.tls_cert, &args.tls_key) {
info!("TLS certificate: {}", cert_path);
info!("TLS key: {}", key_path);
if let Some(tls_port) = args.tls_port {
info!("TLS port: {}", tls_port);
} else {
info!("TLS port: {} (auto)", args.port + 1000);
}
} else {
error!("TLS enabled but certificate and/or key path not provided");
error!("Use --tls-cert and --tls-key to specify certificate files");
return Err(fluxmq::FluxmqError::Config(
"Missing TLS configuration".to_string(),
));
}
}
info!("ACL enabled: {}", args.enable_acl);
if args.enable_acl {
info!("ACL allow everyone: {}", args.acl_allow_everyone);
if let Some(ref acl_config) = args.acl_config {
info!("ACL config file: {}", acl_config);
}
if let Some(ref super_users) = args.super_users {
info!("Super users: {}", super_users);
}
}
if let Some(metrics_port) = args.metrics_port {
info!("Metrics HTTP server port: {}", metrics_port);
} else {
info!("Metrics HTTP server: disabled");
}
let tls_config = if args.enable_tls {
if let (Some(cert_path), Some(key_path)) = (args.tls_cert, args.tls_key) {
Some(TlsConfig::new(cert_path, key_path))
} else {
None
}
} else {
None
};
let super_users = if let Some(super_users_str) = args.super_users {
super_users_str
.split(',')
.map(|s| s.trim().to_string())
.collect()
} else {
vec!["admin".to_string()] };
let config = BrokerConfig {
host: args.host,
port: args.port,
broker_id: args.broker_id,
data_dir: args.data_dir,
enable_replication: args.enable_replication,
enable_consumer_groups: args.enable_consumer_groups,
recovery_mode: args.recovery_mode,
metrics_port: args.metrics_port,
enable_tls: args.enable_tls,
tls_port: args.tls_port,
tls_config,
enable_acl: args.enable_acl,
acl_config_file: args.acl_config,
allow_everyone: args.acl_allow_everyone,
super_users,
..Default::default()
};
let server = BrokerServer::new_async(config).await?;
server.run().await
}
fn parse_log_level(level: &str) -> tracing::Level {
match level.to_lowercase().as_str() {
"trace" => tracing::Level::TRACE,
"debug" => tracing::Level::DEBUG,
"info" => tracing::Level::INFO,
"warn" => tracing::Level::WARN,
"error" => tracing::Level::ERROR,
_ => {
warn!("Invalid log level '{}', defaulting to 'info'", level);
tracing::Level::INFO
}
}
}