use std::net::SocketAddr;
use schemars::JsonSchema;
use serde::Deserialize;
use crabka_security::ListenerProtocol;
use crate::config::ListenerSpec;
#[derive(Debug, thiserror::Error)]
pub enum FileConfigError {
#[error("missing required TOML section: {0}")]
MissingSection(String),
#[error("OPA authorizer configuration error: {0}")]
OpaConfig(String),
#[error("invalid config: {0}")]
InvalidConfig(String),
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema, PartialEq)]
pub struct FileConfig {
pub broker_id: Option<i32>,
pub log_dir: Option<String>,
#[serde(default)]
pub extra_log_dirs: Vec<String>,
#[serde(default)]
pub rack: Option<String>,
#[serde(default)]
pub replica_selector: Option<String>,
pub inter_broker_listener_name: Option<String>,
#[serde(default)]
pub max_connections: Option<usize>,
#[serde(default)]
pub max_connections_per_ip: Option<usize>,
#[serde(default)]
pub listeners: Vec<FileListener>,
#[serde(default)]
pub server_properties: std::collections::BTreeMap<String, String>,
#[serde(default)]
#[schemars(with = "Option<String>")]
pub controller_listener_protocol: Option<ListenerProtocol>,
#[serde(default)]
pub tls_config: Option<FileTlsConfig>,
#[serde(default)]
pub oauthbearer: Option<FileOAuthBearerConfig>,
#[serde(default)]
pub delegation_token: Option<FileDelegationTokenConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub super_users: Option<Vec<String>>,
#[serde(default)]
pub remote_storage: Option<FileRemoteStorageConfig>,
#[serde(default)]
pub authorization: Option<FileAuthorizationConfig>,
#[serde(default)]
pub process: Option<FileProcessConfig>,
#[serde(default)]
pub gssapi: Option<FileGssapiConfig>,
#[serde(default)]
pub inter_broker_credentials: Option<FileInterBrokerCredentials>,
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct FileRemoteStorageConfig {
pub storage_dir: Option<String>,
pub s3: Option<FileRemoteStorageS3Config>,
pub kafka_metadata: Option<FileKafkaRlmmConfig>,
}
#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct FileKafkaRlmmConfig {
#[serde(default)]
pub bootstrap: String,
#[serde(default)]
pub num_partitions: Option<i32>,
#[serde(default)]
pub replication: Option<i32>,
#[serde(default)]
pub in_memory: bool,
}
#[derive(Clone, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct FileRemoteStorageS3Config {
pub bucket: String,
pub region: String,
#[serde(default)]
pub prefix: Option<String>,
#[serde(default)]
pub endpoint: Option<String>,
#[serde(default)]
pub access_key_id: Option<String>,
#[serde(default)]
pub secret_access_key: Option<String>,
#[serde(default)]
pub allow_http: bool,
#[serde(default)]
pub multipart_threshold: Option<u64>,
#[serde(default)]
pub multipart_chunk_size: Option<usize>,
}
impl std::fmt::Debug for FileRemoteStorageS3Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let redact = |opt: &Option<String>| opt.as_ref().map(|_| "***");
f.debug_struct("FileRemoteStorageS3Config")
.field("bucket", &self.bucket)
.field("region", &self.region)
.field("prefix", &self.prefix)
.field("endpoint", &self.endpoint)
.field("access_key_id", &redact(&self.access_key_id))
.field("secret_access_key", &redact(&self.secret_access_key))
.field("allow_http", &self.allow_http)
.field("multipart_threshold", &self.multipart_threshold)
.field("multipart_chunk_size", &self.multipart_chunk_size)
.finish()
}
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct FileAuthorizationConfig {
#[serde(rename = "type", default)]
pub authz_type: AuthzType,
#[serde(default)]
pub super_users: Vec<String>,
#[serde(default)]
pub opa: Option<FileOpaConfig>,
}
#[derive(Debug, Clone, Copy, Default, Deserialize, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum AuthzType {
#[default]
AllowAll,
Simple,
Opa,
}
#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct FileOpaConfig {
pub url: String,
#[serde(default)]
pub allow_on_error: bool,
#[serde(default = "default_opa_maximum_cache_size")]
pub maximum_cache_size: usize,
#[serde(default = "default_opa_expire_after_ms")]
pub expire_after_ms: i64,
}
fn default_opa_maximum_cache_size() -> usize {
50_000
}
fn default_opa_expire_after_ms() -> i64 {
60 * 60 * 1_000
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct FileDelegationTokenConfig {
pub secret_key: Option<String>,
pub max_lifetime_ms: Option<i64>,
pub expiry_check_interval_ms: Option<i64>,
pub default_renew_period_ms: Option<i64>,
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct FileProcessConfig {
#[serde(default)]
pub roles: Vec<String>,
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema, PartialEq)]
pub struct FileOAuthBearerConfig {
#[serde(default)]
pub principal_claim_name: Option<String>,
#[serde(default)]
pub custom_claim_check: Option<String>,
#[serde(default)]
pub valid_token_type: Option<String>,
#[serde(default)]
pub allowable_clock_skew_ms: Option<i64>,
#[serde(default)]
pub jwks_endpoint_uri: Option<String>,
#[serde(default)]
pub valid_issuer_uri: Option<String>,
#[serde(default)]
pub expected_audience: Option<String>,
#[serde(default)]
pub jwks_refresh_interval_ms: Option<u64>,
#[serde(default)]
pub idp_tls_trust: Option<std::path::PathBuf>,
#[serde(default)]
pub introspection_endpoint_uri: Option<String>,
#[serde(default)]
pub userinfo_endpoint_uri: Option<String>,
#[serde(default)]
pub introspection_client_id: Option<String>,
#[serde(default)]
pub introspection_client_secret_path: Option<std::path::PathBuf>,
#[serde(default)]
pub introspection_http_timeout_ms: Option<u64>,
#[serde(default)]
pub max_session_lifetime_seconds: Option<u32>,
#[serde(default)]
pub fallback_user_name_claim: Option<String>,
#[serde(default)]
pub fallback_user_name_prefix: Option<String>,
#[serde(default)]
pub groups_claim: Option<String>,
#[serde(default)]
pub groups_claim_delimiter: Option<String>,
#[serde(default)]
pub jwks_min_refresh_pause_seconds: Option<u32>,
#[serde(default)]
pub jwks_expiry_seconds: Option<u32>,
#[serde(default)]
pub jwks_ignore_key_use: Option<bool>,
}
const DEFAULT_KERBEROS_SERVICE_NAME: &str = "kafka";
#[derive(Debug, Clone, Default, Deserialize, JsonSchema, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct FileGssapiConfig {
pub keytab_path: std::path::PathBuf,
#[serde(default)]
pub service_name: Option<String>,
#[serde(default)]
pub principal_to_local_rules: Vec<String>,
#[serde(default)]
pub realm: Option<String>,
#[serde(default)]
pub kdc: Option<String>,
}
#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)]
#[serde(tag = "type", rename_all = "kebab-case", deny_unknown_fields)]
pub enum FileInterBrokerCredentials {
Gssapi {
keytab_path: std::path::PathBuf,
client_principal: String,
#[serde(default)]
service_name: Option<String>,
kdc_url: String,
},
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema, PartialEq)]
pub struct FileTlsConfig {
pub cert_path: std::path::PathBuf,
pub key_path: std::path::PathBuf,
#[serde(default)]
pub client_ca_path: Option<std::path::PathBuf>,
#[serde(default)]
pub client_auth: FileClientAuthMode,
}
#[derive(Debug, Clone, Copy, Default, Deserialize, JsonSchema, PartialEq, Eq)]
pub enum FileClientAuthMode {
#[default]
Disabled,
Optional,
Required,
}
#[derive(Debug, Clone, Default, Deserialize, JsonSchema, PartialEq)]
pub struct FileListenerSaslConfig {
#[serde(default, deserialize_with = "deserialize_sasl_mechanisms")]
#[schemars(with = "Vec<String>")]
pub enabled_mechanisms: Vec<crabka_security::SaslMechanism>,
}
fn deserialize_sasl_mechanisms<'de, D>(
deserializer: D,
) -> Result<Vec<crabka_security::SaslMechanism>, D::Error>
where
D: serde::Deserializer<'de>,
{
use serde::de::Error;
let names: Vec<String> = Vec::deserialize(deserializer)?;
names
.into_iter()
.map(|s| {
crabka_security::SaslMechanism::from_wire(&s)
.ok_or_else(|| D::Error::custom(format!("unknown SASL mechanism: {s}")))
})
.collect()
}
#[derive(Debug, Clone, Deserialize, JsonSchema, PartialEq)]
pub struct FileListener {
pub name: String,
#[schemars(with = "String")]
pub bind_addr: SocketAddr,
pub advertised: String,
#[schemars(with = "String")]
pub protocol: ListenerProtocol,
#[serde(default)]
pub tls_config: Option<FileTlsConfig>,
#[serde(default)]
pub sasl_config: Option<FileListenerSaslConfig>,
}
impl FileConfig {
#[allow(clippy::too_many_lines)]
pub fn apply_to(self, cfg: &mut crate::config::BrokerConfig) -> Result<(), FileConfigError> {
let defaults = crate::config::BrokerConfig::default();
if let Some(id) = self.broker_id
&& cfg.broker_id == defaults.broker_id
{
cfg.broker_id = id;
}
if let Some(rack) = self.rack {
cfg.rack = Some(rack);
}
if let Some(sel) = self.replica_selector {
cfg.replica_selector = crate::replica_selector::ReplicaSelectorKind::from_config_str(
&sel,
)
.map_err(|bad| {
FileConfigError::InvalidConfig(format!("unknown replica_selector: {bad}"))
})?;
}
if let Some(ld) = self.log_dir
&& cfg.log_dir == defaults.log_dir
{
cfg.log_dir = std::path::PathBuf::from(ld);
}
if !self.extra_log_dirs.is_empty() && cfg.extra_log_dirs.is_empty() {
cfg.extra_log_dirs = self
.extra_log_dirs
.into_iter()
.map(std::path::PathBuf::from)
.collect();
}
let had_file_listeners = !self.listeners.is_empty();
if had_file_listeners {
cfg.listeners = self
.listeners
.into_iter()
.map(FileListener::into_spec)
.collect();
}
if let Some(name) = self.inter_broker_listener_name {
cfg.inter_broker_listener_name = name;
}
if had_file_listeners
&& let Some(adv) = cfg
.listeners
.iter()
.find(|l| l.name == cfg.inter_broker_listener_name)
.or_else(|| cfg.listeners.first())
.map(|l| l.advertised.clone())
{
cfg.advertised_listener = adv;
}
if let Some(max) = self.max_connections
&& cfg.max_connections == defaults.max_connections
{
cfg.max_connections = max;
}
if let Some(max) = self.max_connections_per_ip
&& cfg.max_connections_per_ip == defaults.max_connections_per_ip
{
cfg.max_connections_per_ip = max;
}
if let Some(proto) = self.controller_listener_protocol
&& cfg.controller_listener_protocol == defaults.controller_listener_protocol
{
cfg.controller_listener_protocol = proto;
}
if let Some(tls) = self.tls_config
&& cfg.tls_config.is_none()
{
use crabka_security::{ClientAuthMode, TlsConfig as BrokerTlsConfig};
cfg.tls_config = Some(BrokerTlsConfig {
cert_chain_path: tls.cert_path,
private_key_path: tls.key_path,
trust_roots_path: None,
client_ca_path: tls.client_ca_path,
client_auth: match tls.client_auth {
FileClientAuthMode::Disabled => ClientAuthMode::Disabled,
FileClientAuthMode::Optional => ClientAuthMode::Optional,
FileClientAuthMode::Required => ClientAuthMode::Required,
},
});
}
if let Some(oauth) = self.oauthbearer {
cfg.oauthbearer_idp_tls_trust
.clone_from(&oauth.idp_tls_trust);
cfg.oauthbearer_max_session_lifetime_seconds = oauth.max_session_lifetime_seconds;
let custom_claim_check_compiled = oauth
.custom_claim_check
.as_deref()
.map(|expr| {
jsonpath_rust::parser::parse_json_path(expr).unwrap_or_else(|e| {
panic!(
"[oauthbearer]: invalid custom_claim_check JsonPath expression {expr:?}: {e}"
)
})
});
let groups_claim_compiled = oauth.groups_claim.as_deref().map(|expr| {
jsonpath_rust::parser::parse_json_path(expr).unwrap_or_else(|e| {
panic!("[oauthbearer]: invalid groups_claim JsonPath expression {expr:?}: {e}")
})
});
match (
oauth.jwks_endpoint_uri.as_ref(),
oauth.introspection_endpoint_uri.as_ref(),
) {
(Some(_), Some(_)) => {
panic!(
"[oauthbearer]: jwks_endpoint_uri and introspection_endpoint_uri are mutually exclusive; configure exactly one"
);
}
(Some(_), None) => {
let jwks_uri = oauth.jwks_endpoint_uri.clone().unwrap();
let (signal_tx, signal_rx) = tokio::sync::mpsc::channel::<()>(1);
let last_successful = std::sync::Arc::new(std::sync::atomic::AtomicI64::new(0));
let last_on_demand = std::sync::Arc::new(std::sync::atomic::AtomicI64::new(0));
let handle = crabka_security::JwksHandle::new_with_refresher_handles(
crabka_security::Jwks::empty(),
last_successful.clone(),
signal_tx,
);
let mut v = crabka_security::SignedJwsValidator::new(handle);
if let Some(name) = oauth.principal_claim_name {
v.principal_claim_name = name;
}
if let Some(skew) = oauth.allowable_clock_skew_ms {
v.allowable_clock_skew_ms = skew;
}
v.valid_issuer = oauth.valid_issuer_uri;
v.expected_audience = oauth.expected_audience;
v.custom_claim_check
.clone_from(&custom_claim_check_compiled);
v.valid_token_type.clone_from(&oauth.valid_token_type);
v.fallback_user_name_claim
.clone_from(&oauth.fallback_user_name_claim);
v.fallback_user_name_prefix
.clone_from(&oauth.fallback_user_name_prefix);
v.groups_claim.clone_from(&groups_claim_compiled);
v.groups_claim_delimiter
.clone_from(&oauth.groups_claim_delimiter);
v.expiry_ms = oauth.jwks_expiry_seconds.map(|s| i64::from(s) * 1000);
cfg.oauthbearer_validator = crabka_security::OAuthBearerValidator::Signed(v);
cfg.oauthbearer_jwks_endpoint = Some(jwks_uri);
if let Some(ms) = oauth.jwks_refresh_interval_ms {
cfg.oauthbearer_jwks_refresh_interval =
std::time::Duration::from_millis(ms);
}
*cfg.oauthbearer_jwks_signal_rx.lock().unwrap() = Some(signal_rx);
cfg.oauthbearer_jwks_last_successful_fetch_ms = last_successful;
cfg.oauthbearer_jwks_last_on_demand_refresh_ms = last_on_demand;
cfg.oauthbearer_jwks_min_on_demand_pause = std::time::Duration::from_secs(
u64::from(oauth.jwks_min_refresh_pause_seconds.unwrap_or(1)),
);
cfg.oauthbearer_jwks_ignore_key_use =
oauth.jwks_ignore_key_use.unwrap_or(false);
}
(None, Some(introspect_uri)) => {
let client_id =
oauth.introspection_client_id.clone().unwrap_or_else(|| {
panic!(
"[oauthbearer]: introspection_endpoint_uri set but introspection_client_id is missing"
)
});
let secret_path = oauth
.introspection_client_secret_path
.clone()
.unwrap_or_else(|| {
panic!(
"[oauthbearer]: introspection_endpoint_uri set but introspection_client_secret_path is missing"
)
});
let client_secret = std::fs::read_to_string(&secret_path)
.unwrap_or_else(|e| {
panic!(
"[oauthbearer]: failed to read introspection_client_secret_path {}: {}",
secret_path.display(),
e
)
})
.trim_end_matches(['\n', '\r'])
.to_string();
let timeout = std::time::Duration::from_millis(
oauth.introspection_http_timeout_ms.unwrap_or(10_000),
);
let client = crate::oauth_introspection::ReqwestIntrospectionClient::new(
introspect_uri.clone(),
oauth.userinfo_endpoint_uri.clone(),
client_id,
client_secret,
oauth.idp_tls_trust.as_deref(),
timeout,
)
.unwrap_or_else(|e| {
panic!("[oauthbearer]: failed to build introspection client: {e}")
});
let v = crabka_security::IntrospectionValidator {
client,
principal_claim_name: oauth
.principal_claim_name
.clone()
.unwrap_or_else(|| "sub".into()),
custom_claim_check: custom_claim_check_compiled.clone(),
call_userinfo: oauth.userinfo_endpoint_uri.is_some(),
allowable_clock_skew_ms: oauth.allowable_clock_skew_ms.unwrap_or(30_000),
expected_audience: oauth.expected_audience.clone(),
fallback_user_name_claim: oauth.fallback_user_name_claim.clone(),
fallback_user_name_prefix: oauth.fallback_user_name_prefix.clone(),
groups_claim: groups_claim_compiled.clone(),
groups_claim_delimiter: oauth.groups_claim_delimiter.clone(),
};
cfg.oauthbearer_validator =
crabka_security::OAuthBearerValidator::Introspection(v);
}
(None, None) => {
let mut v = crabka_security::UnsecuredJwsValidator::default();
if let Some(name) = oauth.principal_claim_name {
v.principal_claim_name = name;
}
if let Some(skew) = oauth.allowable_clock_skew_ms {
v.allowable_clock_skew_ms = skew;
}
v.custom_claim_check = custom_claim_check_compiled;
v.valid_token_type.clone_from(&oauth.valid_token_type);
v.fallback_user_name_claim = oauth.fallback_user_name_claim;
v.fallback_user_name_prefix = oauth.fallback_user_name_prefix;
v.groups_claim = groups_claim_compiled;
v.groups_claim_delimiter = oauth.groups_claim_delimiter;
cfg.oauthbearer_validator = crabka_security::OAuthBearerValidator::Unsecured(v);
}
}
}
let env_key = std::env::var("CRABKA_DELEGATION_TOKEN_SECRET_KEY").ok();
let toml_key = self
.delegation_token
.as_ref()
.and_then(|d| d.secret_key.clone());
if let Some(k) = env_key.or(toml_key) {
cfg.delegation_token_secret_key =
Some(crabka_security::SecretBytes::new(k.into_bytes()));
}
if let Some(d) = &self.delegation_token {
if let Some(ms) = d.max_lifetime_ms {
cfg.delegation_token_max_lifetime_ms = ms;
}
if let Some(ms) = d.expiry_check_interval_ms {
cfg.delegation_token_expiry_check_interval_ms = ms;
}
if let Some(ms) = d.default_renew_period_ms {
cfg.delegation_token_default_renew_period_ms = ms;
}
}
if let Some(vec) = self.super_users {
cfg.super_users.extend(vec.iter().cloned());
}
if let Some(rs) = &self.remote_storage {
match (&rs.storage_dir, &rs.s3) {
(Some(_), Some(_)) => {
return Err(FileConfigError::InvalidConfig(
"[remote_storage] cannot set both `storage_dir` (local) \
and `[remote_storage.s3]` (object store)"
.into(),
));
}
(Some(dir), None) => {
cfg.remote_storage_backend = Some(crate::config::RemoteStorageBackend::Local {
dir: std::path::PathBuf::from(dir),
});
}
(None, Some(s3)) => {
cfg.remote_storage_backend = Some(crate::config::RemoteStorageBackend::S3(
crabka_remote_storage::S3Config {
bucket: s3.bucket.clone(),
region: s3.region.clone(),
prefix: s3.prefix.clone(),
endpoint: s3.endpoint.clone(),
access_key_id: s3.access_key_id.clone(),
secret_access_key: s3.secret_access_key.clone(),
allow_http: s3.allow_http,
multipart_threshold: s3
.multipart_threshold
.unwrap_or(crabka_remote_storage::DEFAULT_MULTIPART_THRESHOLD),
multipart_chunk_size: s3
.multipart_chunk_size
.unwrap_or(crabka_remote_storage::DEFAULT_MULTIPART_CHUNK_SIZE),
},
));
}
(None, None) => {}
}
if cfg.remote_storage_backend.is_some() {
let km = rs.kafka_metadata.as_ref();
if km.is_some_and(|k| k.in_memory) {
cfg.remote_log_metadata = crate::config::RlmmKind::InMemory;
} else {
cfg.remote_log_metadata =
crate::config::RlmmKind::TopicBacked(crate::config::KafkaRlmmConfig {
bootstrap: km.map(|k| k.bootstrap.clone()).unwrap_or_default(),
num_partitions: km.and_then(|k| k.num_partitions).unwrap_or(50),
replication: km.and_then(|k| k.replication).unwrap_or(3),
snapshot_interval: crate::config::DEFAULT_RLMM_SNAPSHOT_INTERVAL,
snapshot_dir: cfg.log_dir.join("remote-log-metadata"),
security: None,
});
}
}
}
if let Some(a) = self.authorization.as_ref() {
let auth_super_users: std::collections::HashSet<String> =
a.super_users.iter().cloned().collect();
cfg.super_users.clone_from(&auth_super_users);
cfg.authorizer = match a.authz_type {
AuthzType::AllowAll => std::sync::Arc::new(crate::authorizer::AllowAllAuthorizer),
AuthzType::Simple => std::sync::Arc::new(
crate::authorizer::SimpleAclAuthorizer::new(auth_super_users),
),
AuthzType::Opa => {
let opa = a.opa.as_ref().ok_or_else(|| {
FileConfigError::MissingSection("[authorization.opa]".into())
})?;
let built = crate::authorizer::opa::OpaAuthorizer::new(
auth_super_users,
opa.url.clone(),
opa.allow_on_error,
opa.maximum_cache_size,
opa.expire_after_ms,
)
.map_err(|e| FileConfigError::OpaConfig(format!("{e:?}")))?;
std::sync::Arc::new(built)
}
};
}
if let Some(p) = &self.process
&& !p.roles.is_empty()
{
let mut roles = Vec::with_capacity(p.roles.len());
for r in &p.roles {
let role = match r.to_ascii_lowercase().as_str() {
"controller" => crate::config::NodeRole::Controller,
"broker" => crate::config::NodeRole::Broker,
other => {
return Err(FileConfigError::InvalidConfig(format!(
"unknown process.role `{other}` (expected `controller` or `broker`)"
)));
}
};
roles.push(role);
}
cfg.roles = roles;
}
if let Some(g) = self.gssapi {
let mut rules = Vec::with_capacity(g.principal_to_local_rules.len());
for spec in &g.principal_to_local_rules {
let rule = crabka_security::gssapi::name::Rule::parse(spec).map_err(|e| {
FileConfigError::InvalidConfig(format!(
"[gssapi]: invalid principal_to_local rule {spec:?}: {e}"
))
})?;
rules.push(rule);
}
cfg.gssapi = Some(crabka_security::gssapi::GssapiConfig {
keytab_path: g.keytab_path,
service_name: g
.service_name
.unwrap_or_else(|| DEFAULT_KERBEROS_SERVICE_NAME.to_string()),
principal_to_local_rules: rules,
realm: g.realm,
kdc: g.kdc,
});
}
if let Some(ib) = self.inter_broker_credentials {
cfg.inter_broker_credentials = Some(match ib {
FileInterBrokerCredentials::Gssapi {
keytab_path,
client_principal,
service_name,
kdc_url,
} => crate::config::InterBrokerCredentials::Gssapi {
keytab_path,
client_principal,
service_name: service_name
.unwrap_or_else(|| DEFAULT_KERBEROS_SERVICE_NAME.to_string()),
kdc_url,
},
});
}
Ok(())
}
}
impl FileListener {
#[must_use]
pub fn into_spec(self) -> ListenerSpec {
use crabka_security::{ClientAuthMode, TlsConfig as BrokerTlsConfig};
ListenerSpec {
name: self.name,
bind_addr: self.bind_addr,
advertised: self.advertised,
protocol: self.protocol,
tls_config: self.tls_config.map(|t| BrokerTlsConfig {
cert_chain_path: t.cert_path,
private_key_path: t.key_path,
trust_roots_path: None,
client_ca_path: t.client_ca_path,
client_auth: match t.client_auth {
FileClientAuthMode::Disabled => ClientAuthMode::Disabled,
FileClientAuthMode::Optional => ClientAuthMode::Optional,
FileClientAuthMode::Required => ClientAuthMode::Required,
},
}),
sasl_mechanisms: self.sasl_config.map(|s| s.enabled_mechanisms),
}
}
}
#[cfg(test)]
mod listener_auth_tests {
use super::*;
use assert2::assert;
#[test]
fn file_listener_parses_per_listener_tls_config_inline() {
let toml = r#"
broker_id = 0
log_dir = "/tmp"
inter_broker_listener_name = "internal"
[[listeners]]
name = "internal"
bind_addr = "0.0.0.0:9092"
advertised = "localhost:9092"
protocol = "Plaintext"
[[listeners]]
name = "data"
bind_addr = "0.0.0.0:9094"
advertised = "localhost:9094"
protocol = "Ssl"
tls_config = { cert_path = "/tls/broker.crt", key_path = "/tls/broker.key", client_ca_path = "/tls/clients-ca.crt", client_auth = "Required" }
"#;
let cfg: FileConfig = toml::from_str(toml).unwrap();
assert!(cfg.listeners.len() == 2);
assert!(cfg.listeners[0].tls_config.is_none());
let data_tls = cfg.listeners[1].tls_config.as_ref().unwrap();
assert!(data_tls.cert_path == std::path::PathBuf::from("/tls/broker.crt"));
assert!(data_tls.key_path == std::path::PathBuf::from("/tls/broker.key"));
assert!(
data_tls.client_ca_path.as_deref() == Some(std::path::Path::new("/tls/clients-ca.crt"))
);
assert!(data_tls.client_auth == FileClientAuthMode::Required);
}
#[test]
fn file_listener_parses_per_listener_sasl_config_inline() {
let toml = r#"
broker_id = 0
log_dir = "/tmp"
inter_broker_listener_name = "internal"
[[listeners]]
name = "scram"
bind_addr = "0.0.0.0:9094"
advertised = "localhost:9094"
protocol = "SaslSsl"
tls_config = { cert_path = "/tls/c", key_path = "/tls/k", client_auth = "Disabled" }
sasl_config = { enabled_mechanisms = ["SCRAM-SHA-512"] }
"#;
let cfg: FileConfig = toml::from_str(toml).unwrap();
let sasl = cfg.listeners[0].sasl_config.as_ref().unwrap();
assert!(sasl.enabled_mechanisms == vec![crabka_security::SaslMechanism::ScramSha512]);
}
#[test]
fn top_level_tls_config_still_parses_back_compat() {
let toml = r#"
broker_id = 0
log_dir = "/tmp"
inter_broker_listener_name = "internal"
controller_listener_protocol = "Ssl"
[[listeners]]
name = "internal"
bind_addr = "0.0.0.0:9092"
advertised = "localhost:9092"
protocol = "Plaintext"
[tls_config]
cert_path = "/tls/c"
key_path = "/tls/k"
client_ca_path = "/tls/clients-ca"
client_auth = "Required"
"#;
let cfg: FileConfig = toml::from_str(toml).unwrap();
assert!(cfg.tls_config.is_some());
assert!(cfg.listeners[0].tls_config.is_none());
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert2::assert;
use std::sync::{Mutex, OnceLock};
static ENV_LOCK_CELL: OnceLock<Mutex<()>> = OnceLock::new();
fn env_lock() -> &'static Mutex<()> {
ENV_LOCK_CELL.get_or_init(|| Mutex::new(()))
}
#[test]
fn s3_config_debug_redacts_credentials() {
let cfg = FileRemoteStorageS3Config {
bucket: "logs".to_string(),
region: "us-east-1".to_string(),
prefix: None,
endpoint: None,
access_key_id: Some("AKIAEXAMPLEKEYID".to_string()),
secret_access_key: Some("super-secret-key-value".to_string()),
allow_http: false,
multipart_threshold: None,
multipart_chunk_size: None,
};
let dbg = format!("{cfg:?}");
assert!(!dbg.contains("super-secret-key-value"));
assert!(!dbg.contains("AKIAEXAMPLEKEYID"));
assert!(dbg.contains("***"));
assert!(dbg.contains("logs"));
assert!(dbg.contains("us-east-1"));
}
#[test]
fn empty_toml_round_trips() {
let cfg: FileConfig = toml::from_str("").unwrap();
assert!(cfg == FileConfig::default());
}
#[test]
fn full_toml_round_trips() {
let src = r#"
broker_id = 0
log_dir = "/var/lib/crabka/data"
inter_broker_listener_name = "PLAIN"
[[listeners]]
name = "PLAIN"
bind_addr = "0.0.0.0:9092"
advertised = "demo-0:9092"
protocol = "Plaintext"
[[listeners]]
name = "EXTERNAL"
bind_addr = "0.0.0.0:9094"
advertised = "10.0.1.5:32100"
protocol = "Plaintext"
[server_properties]
"log.retention.hours" = "24"
"#;
let cfg: FileConfig = toml::from_str(src).unwrap();
assert!(cfg.broker_id == Some(0));
assert!(cfg.log_dir.as_deref() == Some("/var/lib/crabka/data"));
assert!(cfg.inter_broker_listener_name.as_deref() == Some("PLAIN"));
assert!(cfg.listeners.len() == 2);
assert!(cfg.listeners[0].name == "PLAIN");
assert!(cfg.listeners[0].protocol == ListenerProtocol::Plaintext);
assert!(
cfg.server_properties
.get("log.retention.hours")
.map(String::as_str)
== Some("24")
);
}
#[test]
fn unknown_top_level_key_is_ignored() {
let src = r#"
broker_id = 0
some_future_field = "from-a-later-slice"
"#;
let cfg: FileConfig = toml::from_str(src).unwrap();
assert!(cfg.broker_id == Some(0));
}
#[test]
fn snake_case_protocol_names() {
let src = r#"
[[listeners]]
name = "S"
bind_addr = "0.0.0.0:9094"
advertised = "h:9094"
protocol = "SaslSsl"
"#;
let cfg: FileConfig = toml::from_str(src).unwrap();
assert!(cfg.listeners[0].protocol == ListenerProtocol::SaslSsl);
}
#[test]
fn invalid_bind_addr_is_an_error() {
let src = r#"
[[listeners]]
name = "X"
bind_addr = "not-a-socket-address"
advertised = "h:9094"
protocol = "Plaintext"
"#;
let err = toml::from_str::<FileConfig>(src).unwrap_err();
assert!(
err.to_string().contains("bind_addr") || err.to_string().contains("socket"),
"unexpected error: {err}"
);
}
#[test]
fn file_listener_into_spec_preserves_fields() {
let fl = FileListener {
name: "X".into(),
bind_addr: "0.0.0.0:9094".parse().unwrap(),
advertised: "h:9094".into(),
protocol: ListenerProtocol::Plaintext,
tls_config: None,
sasl_config: None,
};
let spec = fl.into_spec();
assert!(spec.name == "X");
assert!(spec.advertised == "h:9094");
assert!(spec.protocol == ListenerProtocol::Plaintext);
}
#[test]
fn apply_to_populates_listeners() {
use crate::config::BrokerConfig;
let src = r#"
inter_broker_listener_name = "PLAIN"
[[listeners]]
name = "PLAIN"
bind_addr = "0.0.0.0:9092"
advertised = "demo-0:9092"
protocol = "Plaintext"
"#;
let file: FileConfig = toml::from_str(src).unwrap();
let mut cfg = BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.listeners.len() == 1);
assert!(cfg.listeners[0].name == "PLAIN");
assert!(cfg.listeners[0].advertised == "demo-0:9092");
assert!(cfg.inter_broker_listener_name == "PLAIN");
}
#[test]
fn apply_to_maps_connection_caps() {
use crate::config::BrokerConfig;
let src = r"
max_connections = 100
max_connections_per_ip = 8
";
let file: FileConfig = toml::from_str(src).unwrap();
assert!(file.max_connections == Some(100));
assert!(file.max_connections_per_ip == Some(8));
let mut cfg = BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.max_connections == 100);
assert!(cfg.max_connections_per_ip == 8);
}
#[test]
fn apply_to_omitted_connection_caps_keep_default_unlimited() {
use crate::config::BrokerConfig;
let file: FileConfig = toml::from_str("broker_id = 0").unwrap();
assert!(file.max_connections == None);
let mut cfg = BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.max_connections == usize::MAX);
assert!(cfg.max_connections_per_ip == usize::MAX);
}
#[test]
fn apply_to_does_not_clobber_non_default_broker_id() {
use crate::config::BrokerConfig;
let src = r"broker_id = 42";
let file: FileConfig = toml::from_str(src).unwrap();
let mut cfg = BrokerConfig {
broker_id: 7,
..BrokerConfig::default()
};
file.apply_to(&mut cfg).unwrap();
assert!(cfg.broker_id == 7);
}
#[test]
fn apply_to_fills_in_default_broker_id() {
use crate::config::BrokerConfig;
let src = r"broker_id = 42";
let file: FileConfig = toml::from_str(src).unwrap();
let mut cfg = BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.broker_id == 42);
}
#[test]
fn tls_keys_round_trip() {
let src = r#"
controller_listener_protocol = "Ssl"
[tls_config]
cert_path = "/etc/crabka/broker-tls/0.crt"
key_path = "/etc/crabka/broker-tls/0.key"
client_ca_path = "/etc/crabka/cluster-ca/ca.crt"
client_auth = "Required"
"#;
let cfg: FileConfig = toml::from_str(src).expect("parse TLS config");
assert!(cfg.controller_listener_protocol == Some(ListenerProtocol::Ssl));
let tls = cfg.tls_config.expect("tls_config present");
assert!(tls.cert_path == std::path::PathBuf::from("/etc/crabka/broker-tls/0.crt"));
assert!(tls.client_auth == FileClientAuthMode::Required);
}
#[test]
fn tls_keys_absent_round_trips() {
let src = r#"
broker_id = 0
[[listeners]]
name = "PLAIN"
bind_addr = "0.0.0.0:9092"
advertised = "demo-0:9092"
protocol = "Plaintext"
"#;
let cfg: FileConfig = toml::from_str(src).expect("parse no-TLS");
assert!(cfg.controller_listener_protocol == None);
assert!(cfg.tls_config.is_none());
}
#[test]
fn apply_to_propagates_tls_config() {
let src = r#"
controller_listener_protocol = "Ssl"
[tls_config]
cert_path = "/c"
key_path = "/k"
client_ca_path = "/ca"
client_auth = "Required"
"#;
let file: FileConfig = toml::from_str(src).expect("parse");
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.controller_listener_protocol == crabka_security::ListenerProtocol::Ssl);
let tls = cfg.tls_config.expect("tls_config propagated");
assert!(tls.cert_chain_path == std::path::PathBuf::from("/c"));
}
#[test]
fn apply_to_oauthbearer_jwks_selects_signed_validator() {
let src = r#"
[oauthbearer]
jwks_endpoint_uri = "https://idp.example/jwks"
valid_issuer_uri = "https://idp.example"
expected_audience = "kafka"
principal_claim_name = "client_id"
jwks_refresh_interval_ms = 60000
"#;
let file: FileConfig = toml::from_str(src).expect("parse");
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.oauthbearer_jwks_endpoint.as_deref() == Some("https://idp.example/jwks"));
assert!(cfg.oauthbearer_jwks_refresh_interval == std::time::Duration::from_mins(1));
match cfg.oauthbearer_validator {
crabka_security::OAuthBearerValidator::Signed(v) => {
assert!(v.valid_issuer.as_deref() == Some("https://idp.example"));
assert!(v.expected_audience.as_deref() == Some("kafka"));
assert!(v.principal_claim_name == "client_id");
}
other => panic!("jwks_endpoint_uri must select the Signed validator; got {other:?}"),
}
}
#[test]
fn apply_to_oauthbearer_without_jwks_stays_unsecured() {
let src = r#"
[oauthbearer]
principal_claim_name = "sub"
allowable_clock_skew_ms = 5000
"#;
let file: FileConfig = toml::from_str(src).expect("parse");
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.oauthbearer_jwks_endpoint.is_none());
match cfg.oauthbearer_validator {
crabka_security::OAuthBearerValidator::Unsecured(v) => {
assert!(v.allowable_clock_skew_ms == 5000);
}
other => {
panic!("no jwks_endpoint_uri must keep the unsecured validator; got {other:?}")
}
}
}
#[test]
fn apply_to_oauthbearer_threads_idp_tls_trust_to_broker_config() {
let toml = r#"
[oauthbearer]
jwks_endpoint_uri = "https://idp.example/certs"
idp_tls_trust = "/etc/crabka/oauth/idp-ca.pem"
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(
cfg.oauthbearer_idp_tls_trust.as_deref()
== Some(std::path::Path::new("/etc/crabka/oauth/idp-ca.pem"))
);
}
#[test]
fn apply_to_oauthbearer_without_idp_tls_trust_leaves_field_none() {
let toml = r#"
[oauthbearer]
jwks_endpoint_uri = "https://idp.example/certs"
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.oauthbearer_idp_tls_trust.is_none());
}
#[test]
fn apply_to_oauthbearer_selects_introspection_validator_when_endpoint_set() {
let dir = tempfile::tempdir().unwrap();
let secret_path = dir.path().join("client-secret");
std::fs::write(&secret_path, "the-secret").unwrap();
let toml = format!(
r#"
[oauthbearer]
introspection_endpoint_uri = "https://idp.example/introspect"
introspection_client_id = "kafka-broker"
introspection_client_secret_path = '{}'
"#,
secret_path.display()
);
let file: FileConfig = toml::from_str(&toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(matches!(
cfg.oauthbearer_validator,
crabka_security::OAuthBearerValidator::Introspection(_)
));
}
#[test]
#[should_panic(expected = "mutually exclusive")]
fn apply_to_oauthbearer_rejects_both_jwks_and_introspection_set() {
let dir = tempfile::tempdir().unwrap();
let secret_path = dir.path().join("client-secret");
std::fs::write(&secret_path, "x").unwrap();
let toml = format!(
r#"
[oauthbearer]
jwks_endpoint_uri = "https://idp.example/jwks"
introspection_endpoint_uri = "https://idp.example/introspect"
introspection_client_id = "id"
introspection_client_secret_path = '{}'
"#,
secret_path.display()
);
let file: FileConfig = toml::from_str(&toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
}
#[test]
#[should_panic(expected = "introspection_client_id")]
fn apply_to_oauthbearer_introspection_requires_client_id() {
let dir = tempfile::tempdir().unwrap();
let secret_path = dir.path().join("client-secret");
std::fs::write(&secret_path, "x").unwrap();
let toml = format!(
r#"
[oauthbearer]
introspection_endpoint_uri = "https://idp.example/introspect"
introspection_client_secret_path = '{}'
"#,
secret_path.display()
);
let file: FileConfig = toml::from_str(&toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
}
#[test]
#[should_panic(expected = "introspection_client_secret_path")]
fn apply_to_oauthbearer_introspection_requires_client_secret_path() {
let toml = r#"
[oauthbearer]
introspection_endpoint_uri = "https://idp.example/introspect"
introspection_client_id = "kafka-broker"
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
}
#[test]
fn apply_to_oauthbearer_introspection_with_userinfo_sets_call_userinfo_true() {
let dir = tempfile::tempdir().unwrap();
let secret_path = dir.path().join("client-secret");
std::fs::write(&secret_path, "x").unwrap();
let toml = format!(
r#"
[oauthbearer]
introspection_endpoint_uri = "https://idp.example/introspect"
userinfo_endpoint_uri = "https://idp.example/userinfo"
introspection_client_id = "id"
introspection_client_secret_path = '{}'
"#,
secret_path.display()
);
let file: FileConfig = toml::from_str(&toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
match cfg.oauthbearer_validator {
crabka_security::OAuthBearerValidator::Introspection(v) => assert!(v.call_userinfo),
other => panic!("expected Introspection, got {other:?}"),
}
}
#[test]
fn apply_to_oauthbearer_introspection_without_userinfo_sets_call_userinfo_false() {
let dir = tempfile::tempdir().unwrap();
let secret_path = dir.path().join("client-secret");
std::fs::write(&secret_path, "x").unwrap();
let toml = format!(
r#"
[oauthbearer]
introspection_endpoint_uri = "https://idp.example/introspect"
introspection_client_id = "id"
introspection_client_secret_path = '{}'
"#,
secret_path.display()
);
let file: FileConfig = toml::from_str(&toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
match cfg.oauthbearer_validator {
crabka_security::OAuthBearerValidator::Introspection(v) => assert!(!v.call_userinfo),
other => panic!("expected Introspection, got {other:?}"),
}
}
#[test]
fn apply_to_empty_listeners_does_not_clear_existing() {
use crate::config::BrokerConfig;
let file: FileConfig = toml::from_str("").unwrap();
let mut cfg = BrokerConfig {
listeners: vec![crate::config::ListenerSpec {
name: "X".into(),
bind_addr: "0.0.0.0:9094".parse().unwrap(),
advertised: "h:9094".into(),
protocol: crabka_security::ListenerProtocol::Plaintext,
tls_config: None,
sasl_mechanisms: None,
}],
..BrokerConfig::default()
};
file.apply_to(&mut cfg).unwrap();
assert!(cfg.listeners.len() == 1);
assert!(cfg.listeners[0].name == "X");
}
#[test]
fn apply_to_syncs_advertised_listener_from_inter_broker_listener() {
use crate::config::BrokerConfig;
let toml = r#"
inter_broker_listener_name = "PLAIN"
[[listeners]]
name = "EXTERNAL"
bind_addr = "0.0.0.0:9094"
advertised = "ext.example.com:9094"
protocol = "Plaintext"
[[listeners]]
name = "PLAIN"
bind_addr = "0.0.0.0:9092"
advertised = "demo-0.demo-broker-headless.default.svc.cluster.local:9092"
protocol = "Plaintext"
"#;
let file: FileConfig = toml::from_str(toml).expect("parse");
let mut cfg = BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(
cfg.advertised_listener == "demo-0.demo-broker-headless.default.svc.cluster.local:9092"
);
assert!(cfg.advertised_listener != "ext.example.com:9094");
}
#[test]
fn remote_storage_section_enables_and_sets_dir() {
let toml = r#"
[remote_storage]
storage_dir = "/var/lib/crabka/tier"
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
match cfg.remote_storage_backend {
Some(crate::config::RemoteStorageBackend::Local { dir }) => {
assert!(dir == std::path::PathBuf::from("/var/lib/crabka/tier"));
}
other => panic!("expected Local backend, got {other:?}"),
}
}
#[test]
fn no_remote_storage_section_leaves_backend_none() {
let file: FileConfig = toml::from_str("broker_id = 1").unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.remote_storage_backend.is_none());
assert!(matches!(
cfg.remote_log_metadata,
crate::config::RlmmKind::TopicBacked(_)
));
}
#[test]
fn kafka_metadata_section_parses_with_defaults() {
let toml = r#"
[remote_storage]
storage_dir = "/tmp/tier"
[remote_storage.kafka_metadata]
bootstrap = "127.0.0.1:9092"
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
let km = match &cfg.remote_log_metadata {
crate::config::RlmmKind::TopicBacked(k) => k.clone(),
crate::config::RlmmKind::InMemory => panic!("expected TopicBacked"),
};
assert!(km.bootstrap == "127.0.0.1:9092");
assert!(km.num_partitions == 50);
assert!(km.replication == 3);
}
#[test]
fn kafka_metadata_section_honors_overrides() {
let toml = r#"
[remote_storage]
storage_dir = "/tmp/tier"
[remote_storage.kafka_metadata]
bootstrap = "broker-0:9094"
num_partitions = 8
replication = 1
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
let km = match &cfg.remote_log_metadata {
crate::config::RlmmKind::TopicBacked(k) => k.clone(),
crate::config::RlmmKind::InMemory => panic!("expected TopicBacked"),
};
assert!(km.bootstrap == "broker-0:9094");
assert!(km.num_partitions == 8);
assert!(km.replication == 1);
}
#[test]
fn remote_storage_s3_section_parses() {
let toml = r#"
[remote_storage.s3]
bucket = "crabka-prod"
region = "us-east-1"
prefix = "cluster-a"
endpoint = "http://minio:9000"
allow_http = true
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
match cfg.remote_storage_backend {
Some(crate::config::RemoteStorageBackend::S3(s3)) => {
assert!(s3.bucket == "crabka-prod");
assert!(s3.region == "us-east-1");
assert!(s3.prefix.as_deref() == Some("cluster-a"));
assert!(s3.endpoint.as_deref() == Some("http://minio:9000"));
assert!(s3.allow_http);
assert!(s3.access_key_id.is_none());
assert!(
s3.multipart_threshold == crabka_remote_storage::DEFAULT_MULTIPART_THRESHOLD
);
assert!(
s3.multipart_chunk_size == crabka_remote_storage::DEFAULT_MULTIPART_CHUNK_SIZE
);
}
other => panic!("expected S3 backend, got {other:?}"),
}
}
#[test]
fn remote_storage_s3_section_round_trips_multipart_overrides() {
let toml = r#"
[remote_storage.s3]
bucket = "b"
region = "us-east-1"
multipart_threshold = 8192
multipart_chunk_size = 5242880
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
match cfg.remote_storage_backend {
Some(crate::config::RemoteStorageBackend::S3(s3)) => {
assert!(s3.multipart_threshold == 8192);
assert!(s3.multipart_chunk_size == 5_242_880);
}
other => panic!("expected S3 backend, got {other:?}"),
}
}
#[test]
fn remote_storage_local_and_s3_together_rejected() {
let toml = r#"
[remote_storage]
storage_dir = "/tmp/tier"
[remote_storage.s3]
bucket = "b"
region = "us-east-1"
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
let err = file.apply_to(&mut cfg).unwrap_err();
let rendered = err.to_string();
assert!(
rendered.contains("cannot set both"),
"expected backend-conflict error, got: {rendered}"
);
}
#[test]
fn delegation_token_section_parses_secret_key_and_defaults() {
let _g = env_lock().lock().unwrap();
temp_env::with_var_unset("CRABKA_DELEGATION_TOKEN_SECRET_KEY", || {
let toml = r#"
[delegation_token]
secret_key = "abcdef"
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(
cfg.delegation_token_secret_key
.as_ref()
.map(|s| s.as_bytes().to_vec())
== Some(b"abcdef".to_vec())
);
assert!(cfg.delegation_token_max_lifetime_ms == 7 * 24 * 60 * 60 * 1_000);
assert!(cfg.delegation_token_expiry_check_interval_ms == 60 * 60 * 1_000);
assert!(cfg.delegation_token_default_renew_period_ms == 24 * 60 * 60 * 1_000);
});
}
#[test]
fn delegation_token_default_renew_period_ms_default_and_override() {
let _g = env_lock().lock().unwrap();
temp_env::with_var_unset("CRABKA_DELEGATION_TOKEN_SECRET_KEY", || {
let toml = r#"
[delegation_token]
secret_key = "abcdef"
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(
cfg.delegation_token_default_renew_period_ms == 24 * 60 * 60 * 1_000,
"absent default_renew_period_ms should leave the 24h default in place"
);
let toml = r#"
[delegation_token]
secret_key = "abcdef"
default_renew_period_ms = 7200000
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(
cfg.delegation_token_default_renew_period_ms == 7_200_000,
"TOML default_renew_period_ms must override the default"
);
});
}
#[test]
fn delegation_token_env_var_overrides_toml() {
let _g = env_lock().lock().unwrap();
temp_env::with_var(
"CRABKA_DELEGATION_TOKEN_SECRET_KEY",
Some("env-wins"),
|| {
let toml = r#"
[delegation_token]
secret_key = "toml-loses"
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(
cfg.delegation_token_secret_key
.as_ref()
.map(|s| s.as_bytes().to_vec())
== Some(b"env-wins".to_vec())
);
},
);
}
#[test]
fn delegation_token_absent_when_unset_anywhere() {
let _g = env_lock().lock().unwrap();
temp_env::with_var_unset("CRABKA_DELEGATION_TOKEN_SECRET_KEY", || {
let file: FileConfig = toml::from_str("").unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.delegation_token_secret_key.is_none());
assert!(cfg.delegation_token_max_lifetime_ms == 7 * 24 * 60 * 60 * 1_000);
assert!(cfg.delegation_token_expiry_check_interval_ms == 60 * 60 * 1_000);
assert!(cfg.delegation_token_default_renew_period_ms == 24 * 60 * 60 * 1_000);
});
}
#[test]
fn super_users_toml_populates_broker_config_set() {
let toml = r#"
super_users = ["ANONYMOUS", "admin"]
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.super_users.contains("ANONYMOUS"));
assert!(cfg.super_users.contains("admin"));
assert!(cfg.super_users.len() == 2);
}
fn test_principal(name: &str) -> crabka_security::Principal {
crabka_security::Principal {
name: name.into(),
auth_method: crabka_security::AuthMethod::SaslPlain,
groups: vec![],
}
}
#[test]
fn authorization_section_simple_builds_simple_acl_authorizer() {
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crabka_metadata::{AclOperation, MetadataImage, ResourceType};
let toml = r#"
[authorization]
type = "simple"
super_users = ["admin"]
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(
cfg.super_users.contains("admin"),
"[authorization].super_users must populate BrokerConfig.super_users for act-as parity"
);
let img = MetadataImage::new(uuid::Uuid::nil());
let admin = test_principal("admin");
let host: std::net::SocketAddr = "127.0.0.1:9092".parse().unwrap();
let req = AuthorizationRequest {
principal: &admin,
host: &host,
resource_type: ResourceType::Topic,
resource_name: "t",
operation: AclOperation::Read,
};
assert!(cfg.authorizer.authorize(&img, &req) == AuthorizationResult::Allow);
let alice = test_principal("alice");
let req_alice = AuthorizationRequest {
principal: &alice,
host: &host,
resource_type: ResourceType::Topic,
resource_name: "t",
operation: AclOperation::Read,
};
assert!(
cfg.authorizer.authorize(&img, &req_alice) == AuthorizationResult::Deny,
"type=simple must default-deny non-super-users with no matching ACL"
);
}
#[test]
fn authorization_section_opa_builds_opa_authorizer() {
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crabka_metadata::{AclOperation, MetadataImage, ResourceType};
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
let toml = r#"
[authorization]
type = "opa"
super_users = ["ANONYMOUS"]
[authorization.opa]
url = "http://opa.invalid:8181/v1/data/k/a"
allow_on_error = false
maximum_cache_size = 100
expire_after_ms = 60000
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.super_users.contains("ANONYMOUS"));
let img = MetadataImage::new(uuid::Uuid::nil());
let anon = test_principal("ANONYMOUS");
let host: std::net::SocketAddr = "127.0.0.1:9092".parse().unwrap();
let req = AuthorizationRequest {
principal: &anon,
host: &host,
resource_type: ResourceType::Topic,
resource_name: "t",
operation: AclOperation::Read,
};
assert!(
cfg.authorizer.authorize(&img, &req) == AuthorizationResult::Allow,
"OPA super-user bypass must short-circuit before any HTTP call"
);
});
}
#[test]
fn opa_allow_on_error_defaults_to_fail_closed_when_omitted() {
let toml = r#"
url = "http://opa.invalid:8181/v1/data/k/a"
maximum_cache_size = 100
expire_after_ms = 60000
"#;
let opa: FileOpaConfig = toml::from_str(toml).unwrap();
assert!(!opa.allow_on_error, "allow_on_error must default to false");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
use crate::authorizer::{AuthorizationRequest, AuthorizationResult, Authorizer};
use crabka_metadata::{AclOperation, MetadataImage, ResourceType};
let auth = crate::authorizer::opa::OpaAuthorizer::new(
std::collections::HashSet::new(),
"http://opa.invalid:8181/v1/data/k/a".to_string(),
opa.allow_on_error,
opa.maximum_cache_size,
opa.expire_after_ms,
)
.unwrap();
let img = MetadataImage::new(uuid::Uuid::nil());
let p = test_principal("alice");
let host: std::net::SocketAddr = "127.0.0.1:9092".parse().unwrap();
let req = AuthorizationRequest {
principal: &p,
host: &host,
resource_type: ResourceType::Topic,
resource_name: "t",
operation: AclOperation::Read,
};
assert!(
auth.authorize(&img, &req) == AuthorizationResult::Deny,
"OPA outage with default allow_on_error must fail closed (Deny)"
);
});
}
#[test]
fn authorization_section_absent_defaults_to_allow_all() {
use crate::authorizer::{AuthorizationRequest, AuthorizationResult};
use crabka_metadata::{AclOperation, MetadataImage, ResourceType};
let file: FileConfig = toml::from_str("").unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
let img = MetadataImage::new(uuid::Uuid::nil());
let anyone = test_principal("anyone");
let host: std::net::SocketAddr = "127.0.0.1:9092".parse().unwrap();
let req = AuthorizationRequest {
principal: &anyone,
host: &host,
resource_type: ResourceType::Topic,
resource_name: "t",
operation: AclOperation::Read,
};
assert!(cfg.authorizer.authorize(&img, &req) == AuthorizationResult::Allow);
}
#[test]
fn process_roles_controller_only_from_toml() {
let toml = r#"
[process]
roles = ["controller"]
"#;
let fc: FileConfig = toml::from_str(toml).expect("parse");
let mut cfg = crate::config::BrokerConfig::default();
fc.apply_to(&mut cfg).expect("apply");
assert!(cfg.roles == vec![crate::config::NodeRole::Controller]);
}
#[test]
fn process_roles_both_from_toml() {
let toml = r#"
[process]
roles = ["broker", "controller"]
"#;
let fc: FileConfig = toml::from_str(toml).expect("parse");
let mut cfg = crate::config::BrokerConfig::default();
fc.apply_to(&mut cfg).expect("apply");
assert!(
cfg.roles
== vec![
crate::config::NodeRole::Broker,
crate::config::NodeRole::Controller
]
);
}
#[test]
fn process_roles_rejects_unknown_role() {
let toml = r#"
[process]
roles = ["wizard"]
"#;
let fc: FileConfig = toml::from_str(toml).expect("parse");
let mut cfg = crate::config::BrokerConfig::default();
let err = fc.apply_to(&mut cfg).expect_err("unknown role rejected");
assert!(matches!(err, FileConfigError::InvalidConfig(_)));
}
#[test]
fn process_section_absent_leaves_default_roles() {
let fc: FileConfig = toml::from_str("").expect("parse");
let mut cfg = crate::config::BrokerConfig::default();
fc.apply_to(&mut cfg).expect("apply");
assert!(
cfg.roles
== vec![
crate::config::NodeRole::Controller,
crate::config::NodeRole::Broker
]
);
}
#[test]
fn apply_to_parses_rack_and_replica_selector() {
use crate::replica_selector::ReplicaSelectorKind;
let src = r#"
broker_id = 0
rack = "az-1"
replica_selector = "rack-aware"
"#;
let cfg: FileConfig = toml::from_str(src).expect("parse");
let mut broker = crate::config::BrokerConfig::default();
cfg.apply_to(&mut broker).expect("apply");
assert!(broker.rack.as_deref() == Some("az-1"));
assert!(broker.replica_selector == ReplicaSelectorKind::RackAware);
}
#[test]
fn apply_to_rejects_unknown_replica_selector() {
let src = r#"
broker_id = 0
replica_selector = "nonsense"
"#;
let cfg: FileConfig = toml::from_str(src).expect("parse");
let mut broker = crate::config::BrokerConfig::default();
assert!(cfg.apply_to(&mut broker).is_err());
}
#[test]
fn apply_to_gssapi_maps_all_fields() {
let src = r#"
broker_id = 1
[gssapi]
keytab_path = "/etc/crabka/gssapi-keytab/keytab"
service_name = "kafka"
principal_to_local_rules = ["RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//", "DEFAULT"]
realm = "EXAMPLE.COM"
kdc = "tcp://kdc:88"
"#;
let file: FileConfig = toml::from_str(src).expect("parse [gssapi]");
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).expect("apply [gssapi]");
let g = cfg.gssapi.expect("gssapi config present");
assert!(g.keytab_path == std::path::PathBuf::from("/etc/crabka/gssapi-keytab/keytab"));
assert!(g.service_name == "kafka");
assert!(g.principal_to_local_rules.len() == 2);
assert!(matches!(
g.principal_to_local_rules[1],
crabka_security::gssapi::name::Rule::Default
));
assert!(g.realm.as_deref() == Some("EXAMPLE.COM"));
assert!(g.kdc.as_deref() == Some("tcp://kdc:88"));
}
#[test]
fn apply_to_gssapi_defaults_service_name_to_kafka() {
let src = r#"
[gssapi]
keytab_path = "/k/keytab"
principal_to_local_rules = ["DEFAULT"]
"#;
let file: FileConfig = toml::from_str(src).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(cfg.gssapi.unwrap().service_name == "kafka");
}
#[test]
fn apply_to_gssapi_rejects_malformed_rule() {
let src = r#"
[gssapi]
keytab_path = "/k/keytab"
principal_to_local_rules = ["NOT_A_RULE:::"]
"#;
let file: FileConfig = toml::from_str(src).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
let err = file.apply_to(&mut cfg).unwrap_err();
assert!(matches!(err, FileConfigError::InvalidConfig(_)));
}
#[test]
fn apply_to_inter_broker_credentials_gssapi() {
let src = r#"
[inter_broker_credentials]
type = "gssapi"
keytab_path = "/etc/crabka/gssapi-keytab/keytab"
client_principal = "kafka@EXAMPLE.COM"
service_name = "kafka"
kdc_url = "tcp://kdc:88"
"#;
let file: FileConfig = toml::from_str(src).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
match cfg.inter_broker_credentials.expect("ib creds present") {
crate::config::InterBrokerCredentials::Gssapi {
keytab_path,
client_principal,
service_name,
kdc_url,
} => {
assert!(
keytab_path == std::path::PathBuf::from("/etc/crabka/gssapi-keytab/keytab")
);
assert!(client_principal == "kafka@EXAMPLE.COM");
assert!(service_name == "kafka");
assert!(kdc_url == "tcp://kdc:88");
}
other => panic!("expected Gssapi, got {other:?}"),
}
}
#[test]
fn apply_to_inter_broker_credentials_rejects_unknown_type() {
let src = r#"
[inter_broker_credentials]
type = "carrier-pigeon"
"#;
assert!(toml::from_str::<FileConfig>(src).is_err());
}
#[test]
fn apply_to_inter_broker_credentials_defaults_service_name_to_kafka() {
let src = r#"
[inter_broker_credentials]
type = "gssapi"
keytab_path = "/k/keytab"
client_principal = "kafka@EXAMPLE.COM"
kdc_url = "tcp://kdc:88"
"#;
let file: FileConfig = toml::from_str(src).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
match cfg.inter_broker_credentials.unwrap() {
crate::config::InterBrokerCredentials::Gssapi { service_name, .. } => {
assert!(service_name == "kafka");
}
other => panic!("expected Gssapi, got {other:?}"),
}
}
#[test]
fn file_config_schema_generates() {
let schema = schemars::schema_for!(FileConfig);
let value = serde_json::to_value(&schema).expect("schema serializes");
assert!(
value.get("properties").is_some(),
"FileConfig schema has properties"
);
}
#[test]
fn kafka_metadata_in_memory_true_opts_out_to_in_memory_rlmm() {
let toml = r#"
[remote_storage]
storage_dir = "/tmp/tier"
[remote_storage.kafka_metadata]
in_memory = true
"#;
let file: FileConfig = toml::from_str(toml).unwrap();
let mut cfg = crate::config::BrokerConfig::default();
file.apply_to(&mut cfg).unwrap();
assert!(
matches!(cfg.remote_log_metadata, crate::config::RlmmKind::InMemory),
"in_memory = true must opt out to RlmmKind::InMemory, got {:?}",
cfg.remote_log_metadata
);
}
}