#![allow(clippy::doc_markdown)]
#![allow(clippy::struct_excessive_bools)]
use anyhow::{Context, Result};
use clap::{ArgAction, Args, Subcommand};
use dialoguer::Confirm;
use mqtt5::broker::{BrokerConfig, MqttBroker};
use std::path::{Path, PathBuf};
use tokio::signal;
use tracing::{debug, info};
use super::parsers::{parse_delivery_strategy, parse_duration_secs};
fn parse_storage_backend(
s: &str,
) -> std::result::Result<mqtt5::broker::config::StorageBackend, String> {
match s.to_lowercase().as_str() {
"file" => Ok(mqtt5::broker::config::StorageBackend::File),
"memory" => Ok(mqtt5::broker::config::StorageBackend::Memory),
_ => Err(format!(
"unknown storage backend: {s} (expected 'file' or 'memory')"
)),
}
}
#[derive(Args)]
pub struct BrokerCommand {
#[command(subcommand)]
pub command: Option<BrokerSubcommand>,
#[command(flatten)]
pub run_args: RunArgs,
}
#[derive(Subcommand)]
pub enum BrokerSubcommand {
GenerateConfig(GenerateConfigArgs),
}
#[derive(Args)]
pub struct GenerateConfigArgs {
#[arg(long, short)]
pub output: Option<PathBuf>,
#[arg(long, short, default_value = "json")]
pub format: String,
}
#[derive(Args)]
pub struct RunArgs {
#[arg(long, short, env = "MQTT5_CONFIG")]
pub config: Option<PathBuf>,
#[arg(long, short = 'H', action = ArgAction::Append, env = "MQTT5_BIND", value_delimiter = ',')]
pub host: Vec<String>,
#[arg(long, default_value = "10000", env = "MQTT5_MAX_CLIENTS")]
pub max_clients: usize,
#[arg(long, num_args = 0..=1, default_missing_value = "true", env = "MQTT5_ALLOW_ANONYMOUS")]
pub allow_anonymous: Option<bool>,
#[arg(long, env = "MQTT5_AUTH_PASSWORD_FILE")]
pub auth_password_file: Option<PathBuf>,
#[arg(long, env = "MQTT5_ACL_FILE")]
pub acl_file: Option<PathBuf>,
#[arg(long, value_parser = ["password", "scram", "jwt", "jwt-federated"], env = "MQTT5_AUTH_METHOD")]
pub auth_method: Option<String>,
#[arg(long, env = "MQTT5_SCRAM_FILE")]
pub scram_file: Option<PathBuf>,
#[arg(long, value_parser = ["hs256", "rs256", "es256"], env = "MQTT5_JWT_ALGORITHM")]
pub jwt_algorithm: Option<String>,
#[arg(long, env = "MQTT5_JWT_KEY_FILE")]
pub jwt_key_file: Option<PathBuf>,
#[arg(long, env = "MQTT5_JWT_ISSUER")]
pub jwt_issuer: Option<String>,
#[arg(long, env = "MQTT5_JWT_AUDIENCE")]
pub jwt_audience: Option<String>,
#[arg(long, default_value = "60", value_parser = parse_duration_secs, env = "MQTT5_JWT_CLOCK_SKEW")]
pub jwt_clock_skew: u64,
#[arg(long, env = "MQTT5_JWT_JWKS_URI")]
pub jwt_jwks_uri: Option<String>,
#[arg(long, env = "MQTT5_JWT_FALLBACK_KEY")]
pub jwt_fallback_key: Option<PathBuf>,
#[arg(long, default_value = "3600", value_parser = parse_duration_secs, env = "MQTT5_JWT_JWKS_REFRESH")]
pub jwt_jwks_refresh: u64,
#[arg(long, env = "MQTT5_JWT_ROLE_CLAIM")]
pub jwt_role_claim: Option<String>,
#[arg(long, action = ArgAction::Append, env = "MQTT5_JWT_ROLE_MAP", value_delimiter = ',')]
pub jwt_role_map: Vec<String>,
#[arg(long, env = "MQTT5_JWT_DEFAULT_ROLES")]
pub jwt_default_roles: Option<String>,
#[arg(long, value_parser = ["merge", "replace"], default_value = "merge", env = "MQTT5_JWT_ROLE_MERGE_MODE")]
pub jwt_role_merge_mode: String,
#[arg(long, value_parser = ["identity-only", "claim-binding", "trusted-roles"], env = "MQTT5_JWT_AUTH_MODE")]
pub jwt_auth_mode: Option<String>,
#[arg(long, action = ArgAction::Append, env = "MQTT5_JWT_TRUSTED_ROLE_CLAIM", value_delimiter = ',')]
pub jwt_trusted_role_claim: Vec<String>,
#[arg(long, num_args = 0..=1, default_missing_value = "true", env = "MQTT5_JWT_SESSION_SCOPED_ROLES")]
pub jwt_session_scoped_roles: Option<bool>,
#[arg(long, env = "MQTT5_JWT_ISSUER_PREFIX")]
pub jwt_issuer_prefix: Option<String>,
#[arg(long, env = "MQTT5_JWT_CONFIG_FILE")]
pub jwt_config_file: Option<PathBuf>,
#[arg(long, env = "MQTT5_TLS_CERT")]
pub tls_cert: Option<PathBuf>,
#[arg(long, env = "MQTT5_TLS_KEY")]
pub tls_key: Option<PathBuf>,
#[arg(long, env = "MQTT5_TLS_CA_CERT")]
pub tls_ca_cert: Option<PathBuf>,
#[arg(long, env = "MQTT5_TLS_REQUIRE_CLIENT_CERT")]
pub tls_require_client_cert: bool,
#[arg(long, action = ArgAction::Append, env = "MQTT5_TLS_BIND", value_delimiter = ',')]
pub tls_host: Vec<String>,
#[arg(long, action = ArgAction::Append, env = "MQTT5_WS_BIND", value_delimiter = ',')]
pub ws_host: Vec<String>,
#[arg(long, action = ArgAction::Append, env = "MQTT5_WS_TLS_BIND", value_delimiter = ',')]
pub ws_tls_host: Vec<String>,
#[arg(long, default_value = "/mqtt", env = "MQTT5_WS_PATH")]
pub ws_path: String,
#[arg(long, action = ArgAction::Append, env = "MQTT5_QUIC_BIND", value_delimiter = ',')]
pub quic_host: Vec<String>,
#[arg(long, value_parser = parse_delivery_strategy, env = "MQTT5_QUIC_DELIVERY_STRATEGY")]
pub quic_delivery_strategy: Option<mqtt5::broker::config::ServerDeliveryStrategy>,
#[arg(long, env = "MQTT5_QUIC_EARLY_DATA")]
pub quic_early_data: bool,
#[arg(long, default_value = "./mqtt_storage", env = "MQTT5_STORAGE_DIR")]
pub storage_dir: PathBuf,
#[arg(long, default_value = "file", value_parser = parse_storage_backend, env = "MQTT5_STORAGE_BACKEND")]
pub storage_backend: mqtt5::broker::config::StorageBackend,
#[arg(long, env = "MQTT5_NO_PERSISTENCE")]
pub no_persistence: bool,
#[arg(long, default_value = "3600", value_parser = parse_duration_secs, env = "MQTT5_SESSION_EXPIRY")]
pub session_expiry: u64,
#[arg(long, default_value = "2", env = "MQTT5_MAX_QOS")]
pub max_qos: u8,
#[arg(long, value_parser = parse_duration_secs, env = "MQTT5_KEEP_ALIVE")]
pub keep_alive: Option<u64>,
#[arg(long, env = "MQTT5_RESPONSE_INFORMATION")]
pub response_information: Option<String>,
#[arg(long, env = "MQTT5_NO_RETAIN")]
pub no_retain: bool,
#[arg(long, env = "MQTT5_NO_WILDCARDS")]
pub no_wildcards: bool,
#[arg(long, env = "MQTT5_NON_INTERACTIVE")]
pub non_interactive: bool,
#[cfg(feature = "opentelemetry")]
#[arg(long, env = "MQTT5_OTEL_ENDPOINT")]
pub otel_endpoint: Option<String>,
#[cfg(feature = "opentelemetry")]
#[arg(long, default_value = "mqttv5-broker", env = "MQTT5_OTEL_SERVICE_NAME")]
pub otel_service_name: String,
#[cfg(feature = "opentelemetry")]
#[arg(long, default_value = "1.0", env = "MQTT5_OTEL_SAMPLING")]
pub otel_sampling: f64,
}
pub async fn execute(cmd: BrokerCommand, verbose: bool, debug: bool) -> Result<()> {
if let Some(BrokerSubcommand::GenerateConfig(args)) = cmd.command {
return execute_generate_config(args).await;
}
execute_run(cmd.run_args, verbose, debug).await
}
fn build_example_config() -> BrokerConfig {
use mqtt5::broker::config::{
AuthConfig, AuthMethod, ChangeOnlyDeliveryConfig, QuicConfig, RateLimitConfig,
StorageBackend, StorageConfig, TlsConfig, WebSocketConfig,
};
BrokerConfig {
bind_addresses: vec![
"0.0.0.0:1883".parse().unwrap(),
"[::]:1883".parse().unwrap(),
],
max_clients: 10000,
session_expiry_interval: std::time::Duration::from_secs(3600),
max_packet_size: 268_435_456,
topic_alias_maximum: 65535,
retain_available: true,
maximum_qos: 2,
wildcard_subscription_available: true,
subscription_identifier_available: true,
shared_subscription_available: true,
max_subscriptions_per_client: 0,
max_retained_messages: 0,
max_retained_message_size: 0,
client_channel_capacity: 10000,
server_keep_alive: None,
server_receive_maximum: None,
response_information: None,
auth_config: AuthConfig {
allow_anonymous: true,
password_file: None,
acl_file: None,
auth_method: AuthMethod::None,
auth_data: None,
scram_file: None,
jwt_config: None,
federated_jwt_config: None,
rate_limit: RateLimitConfig::default(),
},
tls_config: Some(TlsConfig {
cert_file: PathBuf::from("/path/to/server.crt"),
key_file: PathBuf::from("/path/to/server.key"),
ca_file: None,
require_client_cert: false,
bind_addresses: vec![
"0.0.0.0:8883".parse().unwrap(),
"[::]:8883".parse().unwrap(),
],
}),
websocket_config: Some(WebSocketConfig {
bind_addresses: vec![
"0.0.0.0:8080".parse().unwrap(),
"[::]:8080".parse().unwrap(),
],
path: "/mqtt".to_string(),
subprotocol: "mqtt".to_string(),
use_tls: false,
}),
websocket_tls_config: Some(WebSocketConfig {
bind_addresses: vec![
"0.0.0.0:8443".parse().unwrap(),
"[::]:8443".parse().unwrap(),
],
path: "/mqtt".to_string(),
subprotocol: "mqtt".to_string(),
use_tls: true,
}),
quic_config: Some(QuicConfig {
cert_file: PathBuf::from("/path/to/server.crt"),
key_file: PathBuf::from("/path/to/server.key"),
ca_file: None,
require_client_cert: false,
bind_addresses: vec![
"0.0.0.0:14567".parse().unwrap(),
"[::]:14567".parse().unwrap(),
],
enable_early_data: false,
}),
cluster_listener_config: None,
storage_config: StorageConfig {
backend: StorageBackend::File,
base_dir: PathBuf::from("./mqtt_storage"),
cleanup_interval: std::time::Duration::from_secs(3600),
enable_persistence: true,
},
change_only_delivery_config: ChangeOnlyDeliveryConfig::default(),
echo_suppression_config: mqtt5::broker::config::EchoSuppressionConfig::default(),
max_outbound_rate_per_client: 0,
server_delivery_strategy: mqtt5::broker::config::ServerDeliveryStrategy::default(),
load_balancer: None,
bridges: vec![],
#[cfg(feature = "opentelemetry")]
opentelemetry_config: None,
event_handler: None,
}
}
async fn execute_generate_config(args: GenerateConfigArgs) -> Result<()> {
let config = build_example_config();
let output = match args.format.to_lowercase().as_str() {
"json" => {
serde_json::to_string_pretty(&config).context("Failed to serialize config to JSON")?
}
"toml" => toml::to_string_pretty(&config).context("Failed to serialize config to TOML")?,
_ => anyhow::bail!("Unsupported format: {}. Use 'json' or 'toml'", args.format),
};
if let Some(path) = args.output {
tokio::fs::write(&path, &output)
.await
.with_context(|| format!("Failed to write config to {}", path.display()))?;
eprintln!("Configuration written to: {}", path.display());
} else {
println!("{output}");
}
Ok(())
}
fn print_startup_info(config: &BrokerConfig, max_clients: usize, config_path: Option<&PathBuf>) {
println!("🚀 MQTT v5.0 broker starting...");
println!(
" 📡 TCP: {}",
config
.bind_addresses
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", ")
);
if let Some(ref tls_cfg) = config.tls_config {
println!(
" 🔒 TLS: {}",
tls_cfg
.bind_addresses
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", ")
);
}
if let Some(ref ws_cfg) = config.websocket_config {
println!(
" 🌐 WebSocket: {} (path: {})",
ws_cfg
.bind_addresses
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", "),
ws_cfg.path
);
}
if let Some(ref ws_tls_cfg) = config.websocket_tls_config {
println!(
" 🔐 WebSocket TLS: {} (path: {})",
ws_tls_cfg
.bind_addresses
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", "),
ws_tls_cfg.path
);
}
if let Some(ref quic_cfg) = config.quic_config {
println!(
" 🚀 QUIC: {}{}",
quic_cfg
.bind_addresses
.iter()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(", "),
if quic_cfg.enable_early_data {
" (0-RTT enabled)"
} else {
""
}
);
}
println!(" 👥 Max clients: {max_clients}");
#[cfg(feature = "opentelemetry")]
if let Some(ref otel_config) = config.opentelemetry_config {
println!(
" 📊 OpenTelemetry: {} (service: {})",
otel_config.otlp_endpoint, otel_config.service_name
);
}
if config_path.is_some() {
println!(" 🔄 Hot-reload: enabled (edit config file or send SIGHUP to reload)");
}
println!(" 📝 Press Ctrl+C to stop");
}
async fn execute_run(mut cmd: RunArgs, verbose: bool, debug: bool) -> Result<()> {
#[cfg(feature = "opentelemetry")]
let has_otel = cmd.otel_endpoint.is_some();
#[cfg(not(feature = "opentelemetry"))]
let has_otel = false;
if !has_otel {
crate::init_basic_tracing(verbose, debug);
}
info!("Starting MQTT v5.0 broker...");
let (config, config_path) = if let Some(config_path) = &cmd.config {
debug!("Loading configuration from: {:?}", config_path);
let cfg = load_config_from_file(config_path)
.await
.with_context(|| format!("Failed to load config from {}", config_path.display()))?;
(cfg, Some(config_path.clone()))
} else {
(create_interactive_config(&mut cmd).await?, None)
};
config
.validate()
.context("Configuration validation failed")?;
info!(
"Creating broker with bind addresses: {:?}",
config.bind_addresses
);
let mut broker = if let Some(ref path) = config_path {
MqttBroker::with_config_file(config.clone(), path.clone())
.await
.context("Failed to create MQTT broker with hot-reload")?
} else {
MqttBroker::with_config(config.clone())
.await
.context("Failed to create MQTT broker")?
};
print_startup_info(&config, cmd.max_clients, config_path.as_ref());
let reload_sender = broker.manual_reload_sender();
#[cfg(unix)]
let mut sighup_task = None;
#[cfg(unix)]
if let Some(sender) = reload_sender {
let mut sighup_stream = signal::unix::signal(signal::unix::SignalKind::hangup())
.context("Failed to register SIGHUP handler")?;
sighup_task = Some(tokio::spawn(async move {
loop {
sighup_stream.recv().await;
info!("Received SIGHUP, triggering config reload");
if sender.send(()).await.is_err() {
break;
}
}
}));
}
let shutdown_signal = async {
match signal::ctrl_c().await {
Ok(()) => {
println!("\n🛑 Received Ctrl+C, shutting down gracefully...");
}
Err(err) => {
tracing::error!("Unable to listen for shutdown signal: {err}");
}
}
};
tokio::select! {
result = broker.run() => {
match result {
Ok(()) => {
info!("Broker stopped normally");
}
Err(e) => {
anyhow::bail!("Broker error: {e}");
}
}
}
() = shutdown_signal => {
info!("Shutdown signal received, stopping broker...");
}
}
#[cfg(unix)]
if let Some(handle) = sighup_task {
handle.abort();
}
println!("✓ MQTT broker stopped");
Ok(())
}
fn resolve_auth_settings(cmd: &RunArgs) -> Result<bool> {
if let Some(allow_anon) = cmd.allow_anonymous {
return Ok(allow_anon);
}
if cmd.auth_password_file.is_some() {
info!("Password file provided, anonymous access disabled by default");
return Ok(false);
}
if cmd.scram_file.is_some() {
info!("SCRAM credentials file provided, anonymous access disabled by default");
return Ok(false);
}
if cmd.jwt_key_file.is_some() {
info!("JWT key file provided, anonymous access disabled by default");
return Ok(false);
}
if cmd.jwt_jwks_uri.is_some() || cmd.jwt_config_file.is_some() {
info!("Federated JWT auth configured, anonymous access disabled by default");
return Ok(false);
}
if cmd.non_interactive {
anyhow::bail!(
"No authentication configured.\n\
Use one of:\n \
--allow-anonymous Allow connections without credentials\n \
--auth-password-file <path> Require password authentication\n \
--scram-file <path> Require SCRAM-SHA-256 authentication\n \
--jwt-key-file <path> Require JWT authentication\n \
--jwt-jwks-uri <url> Require federated JWT authentication"
);
}
eprintln!("⚠️ No authentication configured.");
let allow_anon = Confirm::new()
.with_prompt("Allow anonymous connections? (not recommended for production)")
.default(false)
.interact()
.context("Failed to get authentication choice")?;
Ok(allow_anon)
}
fn resolve_auth_method(cmd: &RunArgs) -> Result<mqtt5::broker::config::AuthMethod> {
use mqtt5::broker::config::AuthMethod;
match cmd.auth_method.as_deref() {
Some("scram") => {
let Some(scram_file) = &cmd.scram_file else {
anyhow::bail!("--scram-file is required when using --auth-method scram");
};
if !scram_file.exists() {
anyhow::bail!("SCRAM credentials file not found: {}", scram_file.display());
}
Ok(AuthMethod::ScramSha256)
}
Some("jwt") => {
let Some(jwt_key_file) = &cmd.jwt_key_file else {
anyhow::bail!("--jwt-key-file is required when using --auth-method jwt");
};
if !jwt_key_file.exists() {
anyhow::bail!("JWT key file not found: {}", jwt_key_file.display());
}
if cmd.jwt_algorithm.is_none() {
anyhow::bail!("--jwt-algorithm is required when using --auth-method jwt");
}
Ok(AuthMethod::Jwt)
}
Some("jwt-federated") => {
if cmd.jwt_jwks_uri.is_none() && cmd.jwt_config_file.is_none() {
anyhow::bail!(
"--jwt-jwks-uri or --jwt-config-file is required when using --auth-method jwt-federated"
);
}
if cmd.jwt_jwks_uri.is_some() {
let Some(fallback) = &cmd.jwt_fallback_key else {
anyhow::bail!("--jwt-fallback-key is required when using --jwt-jwks-uri");
};
if !fallback.exists() {
anyhow::bail!("JWT fallback key file not found: {}", fallback.display());
}
if cmd.jwt_issuer.is_none() {
anyhow::bail!("--jwt-issuer is required when using --jwt-jwks-uri");
}
}
if let Some(config_file) = &cmd.jwt_config_file {
if !config_file.exists() {
anyhow::bail!("JWT config file not found: {}", config_file.display());
}
}
Ok(AuthMethod::JwtFederated)
}
Some("password") | None => {
if cmd.auth_password_file.is_some() {
Ok(AuthMethod::Password)
} else {
Ok(AuthMethod::None)
}
}
Some(other) => anyhow::bail!("Unknown auth method: {other}"),
}
}
fn build_jwt_config(cmd: &RunArgs) -> Result<Option<mqtt5::broker::config::JwtConfig>> {
use mqtt5::broker::config::{JwtAlgorithm, JwtConfig};
let algorithm = match cmd.jwt_algorithm.as_deref() {
Some("hs256") => JwtAlgorithm::HS256,
Some("rs256") => JwtAlgorithm::RS256,
Some("es256") => JwtAlgorithm::ES256,
_ => anyhow::bail!("Invalid JWT algorithm"),
};
let mut jwt_cfg = JwtConfig::new(algorithm, cmd.jwt_key_file.clone().unwrap());
jwt_cfg.clock_skew_secs = cmd.jwt_clock_skew;
if let Some(ref issuer) = cmd.jwt_issuer {
jwt_cfg.issuer = Some(issuer.clone());
}
if let Some(ref audience) = cmd.jwt_audience {
jwt_cfg.audience = Some(audience.clone());
}
Ok(Some(jwt_cfg))
}
async fn build_federated_jwt_config(
cmd: &RunArgs,
) -> Result<Option<mqtt5::broker::config::FederatedJwtConfig>> {
use mqtt5::broker::config::{
ClaimPattern, FederatedAuthMode, FederatedJwtConfig, JwtIssuerConfig, JwtKeySource,
JwtRoleMapping, RoleMergeMode,
};
if let Some(config_file) = &cmd.jwt_config_file {
let content = tokio::fs::read_to_string(config_file)
.await
.with_context(|| {
format!("Failed to read JWT config file: {}", config_file.display())
})?;
let config: FederatedJwtConfig = serde_json::from_str(&content)
.with_context(|| "Failed to parse JWT config file as JSON")?;
return Ok(Some(config));
}
let auth_mode = match cmd.jwt_auth_mode.as_deref() {
Some("claim-binding") => FederatedAuthMode::ClaimBinding,
Some("trusted-roles") => FederatedAuthMode::TrustedRoles,
None => match cmd.jwt_role_merge_mode.as_str() {
"replace" => FederatedAuthMode::TrustedRoles,
_ => FederatedAuthMode::ClaimBinding,
},
Some("identity-only" | _) => FederatedAuthMode::IdentityOnly,
};
let merge_mode = match cmd.jwt_role_merge_mode.as_str() {
"replace" => RoleMergeMode::Replace,
_ => RoleMergeMode::Merge,
};
let default_roles: Vec<String> = cmd
.jwt_default_roles
.as_ref()
.map(|s| s.split(',').map(|r| r.trim().to_string()).collect())
.unwrap_or_default();
let mut role_mappings = Vec::new();
if let Some(claim_path) = &cmd.jwt_role_claim {
for mapping in &cmd.jwt_role_map {
if let Some((value, role)) = mapping.split_once(':') {
role_mappings.push(JwtRoleMapping::new(
claim_path.clone(),
ClaimPattern::Equals(value.to_string()),
vec![role.to_string()],
));
}
}
}
let mut issuer_config = JwtIssuerConfig::new(
"cli-issuer",
cmd.jwt_issuer.clone().unwrap(),
JwtKeySource::Jwks {
uri: cmd.jwt_jwks_uri.clone().unwrap(),
fallback_key_file: cmd.jwt_fallback_key.clone().unwrap(),
refresh_interval_secs: cmd.jwt_jwks_refresh,
cache_ttl_secs: cmd.jwt_jwks_refresh * 24,
},
)
.with_auth_mode(auth_mode)
.with_default_roles(default_roles);
#[allow(deprecated)]
{
issuer_config.role_merge_mode = merge_mode;
}
if let Some(ref audience) = cmd.jwt_audience {
issuer_config = issuer_config.with_audience(audience.clone());
}
if !cmd.jwt_trusted_role_claim.is_empty() {
issuer_config
.trusted_role_claims
.clone_from(&cmd.jwt_trusted_role_claim);
}
if let Some(session_scoped) = cmd.jwt_session_scoped_roles {
issuer_config.session_scoped_roles = session_scoped;
}
if let Some(ref prefix) = cmd.jwt_issuer_prefix {
issuer_config.issuer_prefix = Some(prefix.clone());
}
issuer_config.role_mappings = role_mappings;
Ok(Some(FederatedJwtConfig {
issuers: vec![issuer_config],
clock_skew_secs: cmd.jwt_clock_skew,
}))
}
fn log_auth_summary(
auth_method: mqtt5::broker::config::AuthMethod,
allow_anonymous: bool,
cmd: &RunArgs,
) {
use mqtt5::broker::config::AuthMethod;
match auth_method {
AuthMethod::None if allow_anonymous => {
info!("Anonymous access enabled - clients can connect without credentials");
}
AuthMethod::Password => {
info!(
"Password authentication enabled (file: {:?})",
cmd.auth_password_file.as_ref().unwrap()
);
}
AuthMethod::ScramSha256 => {
info!(
"SCRAM-SHA-256 authentication enabled (file: {:?})",
cmd.scram_file.as_ref().unwrap()
);
}
AuthMethod::Jwt => {
info!(
"JWT authentication enabled (algorithm: {:?}, key: {:?})",
cmd.jwt_algorithm.as_ref().unwrap(),
cmd.jwt_key_file.as_ref().unwrap()
);
}
AuthMethod::JwtFederated => {
if let Some(config_file) = &cmd.jwt_config_file {
info!(
"Federated JWT authentication enabled (config: {:?})",
config_file
);
} else {
info!(
"Federated JWT authentication enabled (issuer: {:?}, jwks: {:?})",
cmd.jwt_issuer.as_ref().unwrap(),
cmd.jwt_jwks_uri.as_ref().unwrap()
);
}
}
AuthMethod::None => {}
}
if let Some(acl_file) = &cmd.acl_file {
info!("ACL authorization enabled (file: {:?})", acl_file);
}
}
fn configure_tls(config: &mut BrokerConfig, cmd: &RunArgs) -> Result<()> {
use mqtt5::broker::config::TlsConfig;
if let (Some(cert), Some(key)) = (&cmd.tls_cert, &cmd.tls_key) {
if !cert.exists() {
anyhow::bail!("TLS certificate file not found: {}", cert.display());
}
if !key.exists() {
anyhow::bail!("TLS key file not found: {}", key.display());
}
let tls_addrs: Result<Vec<std::net::SocketAddr>> = if cmd.tls_host.is_empty() {
Ok(vec![
"0.0.0.0:8883".parse().unwrap(),
"[::]:8883".parse().unwrap(),
])
} else {
cmd.tls_host
.iter()
.map(|h| {
h.parse()
.with_context(|| format!("Invalid TLS bind address: {h}"))
})
.collect()
};
let mut tls_config =
TlsConfig::new(cert.clone(), key.clone()).with_bind_addresses(tls_addrs?);
if let Some(ca_cert) = &cmd.tls_ca_cert {
if !ca_cert.exists() {
anyhow::bail!("TLS CA certificate file not found: {}", ca_cert.display());
}
tls_config = tls_config
.with_ca_file(ca_cert.clone())
.with_require_client_cert(cmd.tls_require_client_cert);
info!("TLS enabled with mTLS (client certificate verification)");
} else if cmd.tls_require_client_cert {
anyhow::bail!("--tls-ca-cert is required when --tls-require-client-cert is set");
} else {
info!("TLS enabled");
}
config.tls_config = Some(tls_config);
} else if cmd.tls_cert.is_some() || cmd.tls_key.is_some() {
anyhow::bail!("Both --tls-cert and --tls-key must be provided together");
} else if cmd.tls_ca_cert.is_some() || cmd.tls_require_client_cert {
anyhow::bail!("--tls-cert and --tls-key must be provided to use --tls-ca-cert or --tls-require-client-cert");
}
Ok(())
}
fn configure_websocket(config: &mut BrokerConfig, cmd: &RunArgs) -> Result<()> {
use mqtt5::broker::config::WebSocketConfig;
if !cmd.ws_host.is_empty() {
let ws_addrs: Result<Vec<std::net::SocketAddr>> = cmd
.ws_host
.iter()
.map(|h| {
h.parse()
.with_context(|| format!("Invalid WebSocket bind address: {h}"))
})
.collect();
let ws_config = WebSocketConfig::default()
.with_bind_addresses(ws_addrs?)
.with_path(cmd.ws_path.clone());
config.websocket_config = Some(ws_config);
info!("WebSocket enabled");
}
if !cmd.ws_tls_host.is_empty() {
if let (Some(cert), Some(key)) = (&cmd.tls_cert, &cmd.tls_key) {
if !cert.exists() {
anyhow::bail!("TLS certificate file not found: {}", cert.display());
}
if !key.exists() {
anyhow::bail!("TLS key file not found: {}", key.display());
}
let ws_tls_addrs: Result<Vec<std::net::SocketAddr>> = cmd
.ws_tls_host
.iter()
.map(|h| {
h.parse()
.with_context(|| format!("Invalid WebSocket TLS bind address: {h}"))
})
.collect();
let ws_tls_config = WebSocketConfig::default()
.with_bind_addresses(ws_tls_addrs?)
.with_path(cmd.ws_path.clone())
.with_tls(true);
config.websocket_tls_config = Some(ws_tls_config);
info!("WebSocket TLS enabled");
} else {
anyhow::bail!(
"Both --tls-cert and --tls-key must be provided when using --ws-tls-host"
);
}
}
Ok(())
}
fn configure_quic(config: &mut BrokerConfig, cmd: &RunArgs) -> Result<()> {
use mqtt5::broker::config::QuicConfig;
if cmd.quic_host.is_empty() {
return Ok(());
}
if let (Some(cert), Some(key)) = (&cmd.tls_cert, &cmd.tls_key) {
if !cert.exists() {
anyhow::bail!("TLS certificate file not found: {}", cert.display());
}
if !key.exists() {
anyhow::bail!("TLS key file not found: {}", key.display());
}
let quic_addrs: Result<Vec<std::net::SocketAddr>> = cmd
.quic_host
.iter()
.map(|h| {
h.parse()
.with_context(|| format!("Invalid QUIC bind address: {h}"))
})
.collect();
let mut quic_config =
QuicConfig::new(cert.clone(), key.clone()).with_bind_addresses(quic_addrs?);
if let Some(ca_cert) = &cmd.tls_ca_cert {
if !ca_cert.exists() {
anyhow::bail!("TLS CA certificate file not found: {}", ca_cert.display());
}
quic_config = quic_config
.with_ca_file(ca_cert.clone())
.with_require_client_cert(cmd.tls_require_client_cert);
}
if cmd.quic_early_data {
quic_config = quic_config.with_early_data(true);
}
config.quic_config = Some(quic_config);
if let Some(strategy) = cmd.quic_delivery_strategy {
config.server_delivery_strategy = strategy;
}
info!("QUIC enabled on {:?}", cmd.quic_host);
} else {
anyhow::bail!("Both --tls-cert and --tls-key must be provided when using --quic-host");
}
Ok(())
}
async fn create_interactive_config(cmd: &mut RunArgs) -> Result<BrokerConfig> {
use mqtt5::broker::config::{AuthConfig, AuthMethod, RateLimitConfig, StorageConfig};
let mut config = BrokerConfig::new();
let bind_addrs: Result<Vec<std::net::SocketAddr>> = if cmd.host.is_empty() {
Ok(vec![
"0.0.0.0:1883".parse().unwrap(),
"[::]:1883".parse().unwrap(),
])
} else {
cmd.host
.iter()
.map(|h| {
h.parse()
.with_context(|| format!("Invalid bind address: {h}"))
})
.collect()
};
config = config.with_bind_addresses(bind_addrs?);
config = config.with_max_clients(cmd.max_clients);
config.session_expiry_interval = std::time::Duration::from_secs(cmd.session_expiry);
config.maximum_qos = cmd.max_qos;
config.retain_available = !cmd.no_retain;
config.wildcard_subscription_available = !cmd.no_wildcards;
if let Some(keep_alive) = cmd.keep_alive {
config.server_keep_alive = Some(std::time::Duration::from_secs(keep_alive));
}
if let Some(ref response_info) = cmd.response_information {
config.response_information = Some(response_info.clone());
}
let allow_anonymous = resolve_auth_settings(cmd)?;
let auth_method = resolve_auth_method(cmd)?;
if let Some(password_file) = &cmd.auth_password_file {
if !password_file.exists() {
anyhow::bail!(
"Authentication password file not found: {}",
password_file.display()
);
}
}
if let Some(acl_file) = &cmd.acl_file {
if !acl_file.exists() {
anyhow::bail!("ACL file not found: {}", acl_file.display());
}
}
let jwt_config = if auth_method == AuthMethod::Jwt {
build_jwt_config(cmd)?
} else {
None
};
let federated_jwt_config = if auth_method == AuthMethod::JwtFederated {
build_federated_jwt_config(cmd).await?
} else {
None
};
let auth_config = AuthConfig {
allow_anonymous,
password_file: cmd.auth_password_file.clone(),
acl_file: cmd.acl_file.clone(),
auth_method,
auth_data: None,
scram_file: cmd.scram_file.clone(),
jwt_config,
federated_jwt_config,
rate_limit: RateLimitConfig::default(),
};
config = config.with_auth(auth_config);
log_auth_summary(auth_method, allow_anonymous, cmd);
configure_tls(&mut config, cmd)?;
configure_websocket(&mut config, cmd)?;
configure_quic(&mut config, cmd)?;
let storage_config = StorageConfig {
enable_persistence: !cmd.no_persistence,
base_dir: cmd.storage_dir.clone(),
backend: cmd.storage_backend,
cleanup_interval: std::time::Duration::from_secs(300),
};
config.storage_config = storage_config;
#[cfg(feature = "opentelemetry")]
if let Some(endpoint) = &cmd.otel_endpoint {
use mqtt5::telemetry::TelemetryConfig;
let telemetry_config = TelemetryConfig::new(&cmd.otel_service_name)
.with_endpoint(endpoint)
.with_sampling_ratio(cmd.otel_sampling);
config = config.with_opentelemetry(telemetry_config);
info!(
"OpenTelemetry enabled: endpoint={}, service={}, sampling={}",
endpoint, cmd.otel_service_name, cmd.otel_sampling
);
}
Ok(config)
}
async fn load_config_from_file(config_path: &Path) -> Result<BrokerConfig> {
let contents = tokio::fs::read_to_string(config_path)
.await
.context("Failed to read config file")?;
serde_json::from_str(&contents).context("Failed to parse config file as JSON")
}