use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use clap::Parser;
use crabka_broker::{BootstrapMode, Broker, BrokerConfig};
use crabka_log::LogConfig;
fn parse_roles_arg(roles: &[String]) -> Result<Vec<crabka_broker::config::NodeRole>, String> {
use crabka_broker::config::NodeRole;
roles
.iter()
.map(|r| match r.to_ascii_lowercase().as_str() {
"controller" => Ok(NodeRole::Controller),
"broker" => Ok(NodeRole::Broker),
other => Err(format!(
"unknown --process-roles value `{other}` (expected `controller` or `broker`)"
)),
})
.collect()
}
#[derive(Debug, Parser)]
#[command(
name = "crabka-broker",
version,
about = "Single-node Kafka-compatible broker (MVP)"
)]
struct Args {
#[arg(long, default_value = "127.0.0.1:9092", conflicts_with = "config_file")]
listen_addr: SocketAddr,
#[arg(
long,
env = "CRABKA_ADVERTISED_LISTENER",
conflicts_with = "config_file"
)]
advertised_listener: Option<String>,
#[arg(long)]
config_file: Option<PathBuf>,
#[arg(long, default_value = "./crabka-data")]
log_dir: PathBuf,
#[arg(
long,
env = "CRABKA_EXTRA_LOG_DIRS",
value_delimiter = ',',
num_args = 0..
)]
extra_log_dirs: Vec<PathBuf>,
#[arg(long, default_value_t = 1)]
broker_id: i32,
#[arg(
long,
env = "CRABKA_PROCESS_ROLES",
value_delimiter = ',',
num_args = 0..
)]
process_roles: Vec<String>,
#[arg(long, env = "CRABKA_CLUSTER_ID")]
cluster_id: Option<uuid::Uuid>,
#[arg(
long,
env = "CRABKA_METRICS_LISTEN_ADDR",
default_value = "0.0.0.0:9404"
)]
metrics_listen_addr: String,
#[arg(
long,
env = "CRABKA_PARTITION_DISK_SCAN_INTERVAL_SECS",
default_value_t = 60
)]
partition_disk_scan_interval_secs: u64,
#[arg(
long,
env = "CRABKA_CONTROLLER_BOOTSTRAP_SERVERS",
value_delimiter = ',',
num_args = 0..
)]
controller_bootstrap_servers: Vec<SocketAddr>,
#[arg(long, env = "CRABKA_CONTROLLER_AUTO_JOIN")]
controller_auto_join: bool,
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let args = Args::parse();
let otlp = crabka_broker::telemetry::OtlpConfig::from_env(
|k| std::env::var(k).ok(),
&args.broker_id.to_string(),
env!("CARGO_PKG_VERSION"),
"crabka-broker",
);
let telemetry = crabka_broker::telemetry::init(
otlp,
"crabka_broker=info,crabka_log=info,info",
"info,crabka_broker::request=debug,crabka_log=info",
"crabka-broker",
)?;
let file_config: Option<crabka_broker::file_config::FileConfig> =
match args.config_file.as_ref() {
Some(p) => {
let contents = std::fs::read_to_string(p)
.map_err(|e| format!("failed to read {}: {e}", p.display()))?;
Some(
toml::from_str(&contents)
.map_err(|e| format!("failed to parse {}: {e}", p.display()))?,
)
}
None => None,
};
let advertised = args
.advertised_listener
.unwrap_or_else(|| args.listen_addr.to_string());
let controller_addr: std::net::SocketAddr = {
let mut a = args.listen_addr;
a.set_port(9093);
a
};
let node_id = u64::try_from(args.broker_id).unwrap_or_else(|_| {
eprintln!("broker_id must be non-negative");
std::process::exit(1);
});
let metrics_listen_addr = parse_metrics_addr(&args.metrics_listen_addr)?;
let mut config = BrokerConfig {
broker_id: args.broker_id,
listen_addr: args.listen_addr,
advertised_listener: advertised,
log_dir: args.log_dir,
extra_log_dirs: args.extra_log_dirs,
log_config: LogConfig::default(),
node_id,
controller_listen_addr: controller_addr,
controller_quorum_voters: vec![(node_id, controller_addr)],
bootstrap_servers: args.controller_bootstrap_servers,
directory_id: uuid::Uuid::nil(),
auto_join: args.controller_auto_join,
observer_lag_bound: 100,
heartbeat_interval_ms: 3_000,
heartbeat_timeout_ms: 9_000,
replica_lag_time_max_ms: 30_000,
controller_election_timeout: std::time::Duration::from_secs(5),
controller_heartbeat_interval: std::time::Duration::from_millis(500),
bootstrap_mode: BootstrapMode::Bootstrap,
cluster_id: args.cluster_id,
metrics_listen_addr,
partition_disk_scan_interval_secs: args.partition_disk_scan_interval_secs,
..BrokerConfig::default()
};
if !args.process_roles.is_empty() {
config.roles = parse_roles_arg(&args.process_roles)?;
}
if let Some(fc) = file_config {
fc.apply_to(&mut config)?;
}
config.bootstrap_mode = detect_bootstrap_mode(&config.log_dir);
config.directory_id = crabka_broker::bootstrap::read_directory_id(&config.log_dir)?;
tracing::info!(
bootstrap_mode = ?config.bootstrap_mode,
directory_id = %config.directory_id,
log_dir = %config.log_dir.display(),
"selected bootstrap mode"
);
let handle = Broker::start(config).await?;
tracing::info!(addr = %handle.listen_addr(), "crabka-broker listening");
let mut shutdown_rx = handle.should_shutdown_rx();
tokio::select! {
r = tokio::signal::ctrl_c() => {
r?;
tracing::info!("shutdown signal received");
}
() = async {
loop {
if *shutdown_rx.borrow_and_update() { break; }
if shutdown_rx.changed().await.is_err() { break; }
}
} => {
tracing::error!("self-shutdown triggered (all log dirs offline); stopping broker");
}
}
handle.shutdown().await;
tracing::info!("crabka-broker stopped");
telemetry.shutdown();
Ok(())
}
fn parse_metrics_addr(s: &str) -> Result<Option<SocketAddr>, Box<dyn std::error::Error>> {
let trimmed = s.trim();
if trimmed.is_empty() || trimmed.eq_ignore_ascii_case("none") {
return Ok(None);
}
Ok(Some(trimmed.parse()?))
}
fn detect_bootstrap_mode(log_dir: &Path) -> BootstrapMode {
let raft_log_dir = log_dir.join("__cluster_metadata").join("@metadata-0");
let has_state = match std::fs::read_dir(&raft_log_dir) {
Ok(mut entries) => entries.next().is_some(),
Err(_) => false,
};
if has_state {
BootstrapMode::Rejoin
} else {
BootstrapMode::Bootstrap
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use tempfile::tempdir;
#[test]
fn detect_bootstrap_when_log_dir_is_empty() {
let dir = tempdir().unwrap();
assert!(detect_bootstrap_mode(dir.path()) == BootstrapMode::Bootstrap);
}
#[test]
fn parse_roles_arg_maps_strings() {
assert!(
parse_roles_arg(&["controller".to_string(), "broker".to_string()]).unwrap()
== vec![
crabka_broker::config::NodeRole::Controller,
crabka_broker::config::NodeRole::Broker
]
);
}
#[test]
fn parse_roles_arg_rejects_unknown() {
assert!(parse_roles_arg(&["nope".to_string()]).is_err());
}
#[test]
fn detect_bootstrap_when_metadata_dir_missing() {
let dir = tempdir().unwrap();
std::fs::write(dir.path().join("bootstrap.json"), "{}").unwrap();
assert!(detect_bootstrap_mode(dir.path()) == BootstrapMode::Bootstrap);
}
#[test]
fn detect_rejoin_when_metadata_dir_has_state() {
let dir = tempdir().unwrap();
let meta = dir.path().join("__cluster_metadata").join("@metadata-0");
std::fs::create_dir_all(&meta).unwrap();
std::fs::write(meta.join("00000000000000000000.log"), b"segment").unwrap();
assert!(detect_bootstrap_mode(dir.path()) == BootstrapMode::Rejoin);
}
#[test]
fn detect_bootstrap_when_metadata_dir_empty() {
let dir = tempdir().unwrap();
std::fs::create_dir_all(dir.path().join("__cluster_metadata").join("@metadata-0")).unwrap();
assert!(detect_bootstrap_mode(dir.path()) == BootstrapMode::Bootstrap);
}
#[test]
fn detect_bootstrap_when_only_outer_cluster_metadata_dir_exists() {
let dir = tempdir().unwrap();
std::fs::create_dir_all(dir.path().join("__cluster_metadata")).unwrap();
assert!(detect_bootstrap_mode(dir.path()) == BootstrapMode::Bootstrap);
}
#[test]
fn config_file_mutually_exclusive_with_listen_addr() {
use clap::Parser;
let res = Args::try_parse_from([
"crabka-broker",
"--config-file=/tmp/a.toml",
"--listen-addr=127.0.0.1:9092",
]);
let err = res.expect_err("expected mutual-exclusion error");
let s = err.to_string();
assert!(
s.contains("config-file") && s.contains("listen-addr"),
"expected clap conflict mentioning both flags, got: {s}"
);
}
#[test]
fn config_file_mutually_exclusive_with_advertised_listener() {
use clap::Parser;
let res = Args::try_parse_from([
"crabka-broker",
"--config-file=/tmp/a.toml",
"--advertised-listener=h:9092",
]);
let err = res.expect_err("expected mutual-exclusion error");
let s = err.to_string();
assert!(
s.contains("config-file") && s.contains("advertised-listener"),
"expected clap conflict, got: {s}"
);
}
#[test]
fn config_file_alone_parses() {
use clap::Parser;
let args = Args::try_parse_from(["crabka-broker", "--config-file=/tmp/a.toml"]).unwrap();
assert!(args.config_file.as_deref() == Some(std::path::Path::new("/tmp/a.toml")));
assert!(args.advertised_listener.is_none());
}
}