use std::collections::HashMap;
use std::path::Path;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum KafkaConfigError {
#[error("kafka config file not found: {path}")]
FileNotFound { path: std::path::PathBuf },
#[error("unsupported kafka config format: {ext}. Supported: .properties, .yaml, .yml, .json")]
UnsupportedFormat { ext: String },
#[error("parse error in {path}: {message}")]
ParseError { path: String, message: String },
#[error("io error reading kafka config: {0}")]
Io(#[from] std::io::Error),
}
pub type KafkaConfigResult<T> = Result<T, KafkaConfigError>;
pub fn config_from_file(path: impl AsRef<Path>) -> KafkaConfigResult<HashMap<String, String>> {
let path = path.as_ref();
let content = std::fs::read_to_string(path).map_err(|e| {
if e.kind() == std::io::ErrorKind::NotFound {
KafkaConfigError::FileNotFound {
path: path.to_path_buf(),
}
} else {
KafkaConfigError::Io(e)
}
})?;
let ext = path
.extension()
.and_then(|s| s.to_str())
.unwrap_or("")
.to_lowercase();
let path_str = path.display().to_string();
match ext.as_str() {
"properties" => Ok(config_from_properties_str(&content)),
"yaml" | "yml" => parse_yaml(&content, path_str),
"json" => parse_json(&content, path_str),
other => Err(KafkaConfigError::UnsupportedFormat {
ext: other.to_string(),
}),
}
}
#[must_use]
pub fn config_from_properties_str(content: &str) -> HashMap<String, String> {
let mut config = HashMap::new();
for line in content.lines() {
let line = line.trim();
if line.is_empty() || line.starts_with('#') || line.starts_with('!') {
continue;
}
if let Some((key, value)) = line.split_once('=') {
config.insert(key.trim().to_string(), value.trim().to_string());
}
}
config
}
fn parse_yaml(content: &str, path: String) -> KafkaConfigResult<HashMap<String, String>> {
#[cfg(feature = "directory-config")]
{
serde_yaml_ng::from_str(content).map_err(|e| KafkaConfigError::ParseError {
path,
message: e.to_string(),
})
}
#[cfg(not(feature = "directory-config"))]
{
let _ = (content, path);
Err(KafkaConfigError::UnsupportedFormat {
ext: "yaml — enable the `directory-config` feature".to_string(),
})
}
}
fn parse_json(content: &str, path: String) -> KafkaConfigResult<HashMap<String, String>> {
#[cfg(feature = "config")]
{
serde_json::from_str(content).map_err(|e| KafkaConfigError::ParseError {
path,
message: e.to_string(),
})
}
#[cfg(not(feature = "config"))]
{
let _ = (content, path);
Err(KafkaConfigError::UnsupportedFormat {
ext: "json — enable the `config` feature".to_string(),
})
}
}
#[must_use]
pub fn merge_with_overrides<S: std::hash::BuildHasher>(
profile: &[(&str, &str)],
overrides: &HashMap<String, String, S>,
) -> HashMap<String, String> {
let mut config = HashMap::with_capacity(profile.len() + overrides.len());
for (key, value) in profile {
config.insert((*key).to_string(), (*value).to_string());
}
for (key, value) in overrides {
config.insert(key.clone(), value.clone());
}
config
}
pub const CONSUMER_PRODUCTION: &[(&str, &str)] = &[
("partition.assignment.strategy", "cooperative-sticky"),
("fetch.min.bytes", "1048576"),
("fetch.wait.max.ms", "100"),
("queued.min.messages", "20000"),
("enable.auto.commit", "false"),
("statistics.interval.ms", "1000"),
];
pub const CONSUMER_DEVTEST: &[(&str, &str)] = &[
("partition.assignment.strategy", "cooperative-sticky"),
("queued.min.messages", "1000"),
("enable.auto.commit", "false"),
("reconnect.backoff.ms", "10"),
("reconnect.backoff.max.ms", "100"),
("log.connection.close", "true"),
("statistics.interval.ms", "1000"),
];
pub const CONSUMER_LOW_LATENCY: &[(&str, &str)] = &[
("partition.assignment.strategy", "cooperative-sticky"),
("fetch.wait.max.ms", "10"),
("queued.min.messages", "1000"),
("enable.auto.commit", "false"),
("reconnect.backoff.ms", "10"),
("reconnect.backoff.max.ms", "100"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_PRODUCTION: &[(&str, &str)] = &[
("linger.ms", "100"),
("compression.type", "zstd"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_EXACTLY_ONCE: &[(&str, &str)] = &[
("enable.idempotence", "true"),
("acks", "all"),
("max.in.flight.requests.per.connection", "5"),
("linger.ms", "20"),
("compression.type", "zstd"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_LOW_LATENCY: &[(&str, &str)] = &[
("acks", "1"),
("linger.ms", "0"),
("compression.type", "lz4"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_DEVTEST: &[(&str, &str)] = &[
("acks", "1"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
pub const TOPIC_SUFFIX_LAND: &str = "_land";
pub const TOPIC_SUFFIX_LOAD: &str = "_load";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ServiceRole {
Transform,
Universal,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DfeSource {
name: String,
land_suffix: String,
load_suffix: String,
}
impl DfeSource {
#[must_use]
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
land_suffix: TOPIC_SUFFIX_LAND.to_string(),
load_suffix: TOPIC_SUFFIX_LOAD.to_string(),
}
}
#[must_use]
pub fn with_suffixes(
name: impl Into<String>,
land_suffix: impl Into<String>,
load_suffix: impl Into<String>,
) -> Self {
Self {
name: name.into(),
land_suffix: land_suffix.into(),
load_suffix: load_suffix.into(),
}
}
#[must_use]
pub fn name(&self) -> &str {
&self.name
}
#[must_use]
pub fn input_topic(&self) -> String {
format!("{}{}", self.name, self.land_suffix)
}
#[must_use]
pub fn output_topic(&self) -> String {
format!("{}{}", self.name, self.load_suffix)
}
pub fn consumer_group(
&self,
service: &str,
role: ServiceRole,
pipeline: Option<&str>,
cg_override: Option<&str>,
) -> Result<String, KafkaConfigError> {
if let Some(cg) = cg_override {
return Ok(cg.to_string());
}
match role {
ServiceRole::Transform => {
let suffix = pipeline.unwrap_or(&self.name);
if suffix.is_empty() {
return Err(KafkaConfigError::ParseError {
path: String::new(),
message: format!(
"transform service '{service}' requires a source or pipeline \
name for its consumer group — a bare 'dfe-{service}' CG would \
cause multiple pipelines to compete for messages"
),
});
}
Ok(format!("dfe-{service}-{suffix}"))
}
ServiceRole::Universal => Ok(format!("dfe-{service}")),
}
}
#[must_use]
pub fn source_from_topic(topic: &str) -> Option<&str> {
topic
.strip_suffix(TOPIC_SUFFIX_LAND)
.or_else(|| topic.strip_suffix(TOPIC_SUFFIX_LOAD))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn consumer_production_only_non_defaults() {
assert_eq!(CONSUMER_PRODUCTION.len(), 6);
let map: HashMap<&str, &str> = CONSUMER_PRODUCTION.iter().copied().collect();
assert_eq!(map["partition.assignment.strategy"], "cooperative-sticky");
assert_eq!(map["fetch.min.bytes"], "1048576");
assert_eq!(map["fetch.wait.max.ms"], "100");
assert_eq!(map["queued.min.messages"], "20000");
assert_eq!(map["enable.auto.commit"], "false");
assert_eq!(map["statistics.interval.ms"], "1000");
}
#[test]
fn producer_production_only_non_defaults() {
assert_eq!(PRODUCER_PRODUCTION.len(), 4);
let map: HashMap<&str, &str> = PRODUCER_PRODUCTION.iter().copied().collect();
assert_eq!(map["linger.ms"], "100");
assert_eq!(map["compression.type"], "zstd");
assert_eq!(map["socket.nagle.disable"], "true");
assert_eq!(map["statistics.interval.ms"], "1000");
}
#[test]
fn merge_user_overrides_win() {
let mut overrides = HashMap::new();
overrides.insert("fetch.min.bytes".to_string(), "2097152".to_string());
overrides.insert("custom.setting".to_string(), "value".to_string());
let merged = merge_with_overrides(CONSUMER_PRODUCTION, &overrides);
assert_eq!(merged["fetch.min.bytes"], "2097152");
assert_eq!(merged["custom.setting"], "value");
assert_eq!(
merged["partition.assignment.strategy"],
"cooperative-sticky"
);
}
#[test]
fn merge_empty_overrides_returns_profile() {
let overrides = HashMap::new();
let merged = merge_with_overrides(CONSUMER_PRODUCTION, &overrides);
assert_eq!(merged.len(), CONSUMER_PRODUCTION.len());
}
#[test]
fn merge_empty_profile_returns_overrides() {
let mut overrides = HashMap::new();
overrides.insert("key".to_string(), "value".to_string());
let merged = merge_with_overrides(&[], &overrides);
assert_eq!(merged.len(), 1);
assert_eq!(merged["key"], "value");
}
#[test]
fn properties_str_basic() {
let content = "\
# This is a comment
bootstrap.servers=kafka1:9092,kafka2:9092
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
! Another comment style
";
let config = config_from_properties_str(content);
assert_eq!(config.len(), 3);
assert_eq!(config["bootstrap.servers"], "kafka1:9092,kafka2:9092");
assert_eq!(config["security.protocol"], "SASL_SSL");
assert_eq!(config["sasl.mechanism"], "SCRAM-SHA-512");
}
#[test]
fn properties_str_value_with_equals() {
let content = "ssl.certificate.pem=MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMI==\n";
let config = config_from_properties_str(content);
assert_eq!(
config["ssl.certificate.pem"],
"MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMI=="
);
}
#[test]
fn properties_str_empty_and_whitespace() {
let config = config_from_properties_str(" \n# comment\n\n");
assert!(config.is_empty());
}
#[test]
fn config_from_file_properties() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("kafka.properties");
std::fs::write(
&path,
"bootstrap.servers=kafka:9092\ncompression.type=zstd\n",
)
.unwrap();
let config = config_from_file(&path).unwrap();
assert_eq!(config["bootstrap.servers"], "kafka:9092");
assert_eq!(config["compression.type"], "zstd");
}
#[test]
fn config_from_file_not_found() {
let result = config_from_file("/nonexistent/kafka.properties");
assert!(matches!(result, Err(KafkaConfigError::FileNotFound { .. })));
}
#[test]
fn dfe_source_default_topics() {
let source = DfeSource::new("syslog");
assert_eq!(source.name(), "syslog");
assert_eq!(source.input_topic(), "syslog_land");
assert_eq!(source.output_topic(), "syslog_load");
}
#[test]
fn dfe_source_custom_suffixes() {
let source = DfeSource::with_suffixes("auth", "_raw", "_enriched");
assert_eq!(source.input_topic(), "auth_raw");
assert_eq!(source.output_topic(), "auth_enriched");
}
#[test]
fn dfe_source_cg_transform_default() {
let source = DfeSource::new("syslog");
assert_eq!(
source
.consumer_group("transform-vector", ServiceRole::Transform, None, None)
.unwrap(),
"dfe-transform-vector-syslog"
);
}
#[test]
fn dfe_source_cg_transform_with_pipeline() {
let source = DfeSource::new("syslog");
assert_eq!(
source
.consumer_group(
"transform-vector",
ServiceRole::Transform,
Some("syslog-enriched"),
None
)
.unwrap(),
"dfe-transform-vector-syslog-enriched"
);
}
#[test]
fn dfe_source_cg_transform_empty_source_errors() {
let source = DfeSource::new("");
assert!(
source
.consumer_group("transform-vector", ServiceRole::Transform, None, None)
.is_err()
);
}
#[test]
fn dfe_source_cg_transform_empty_source_pipeline_rescues() {
let source = DfeSource::new("");
assert_eq!(
source
.consumer_group(
"transform-vector",
ServiceRole::Transform,
Some("syslog"),
None
)
.unwrap(),
"dfe-transform-vector-syslog"
);
}
#[test]
fn dfe_source_cg_universal() {
let source = DfeSource::new("netflow");
assert_eq!(
source
.consumer_group("loader", ServiceRole::Universal, None, None)
.unwrap(),
"dfe-loader"
);
}
#[test]
fn dfe_source_cg_universal_ignores_pipeline() {
let source = DfeSource::new("syslog");
assert_eq!(
source
.consumer_group("archiver", ServiceRole::Universal, Some("ignored"), None)
.unwrap(),
"dfe-archiver"
);
}
#[test]
fn dfe_source_cg_override_wins() {
let source = DfeSource::new("syslog");
assert_eq!(
source
.consumer_group(
"transform-vector",
ServiceRole::Transform,
None,
Some("my-custom-cg")
)
.unwrap(),
"my-custom-cg"
);
}
#[test]
fn dfe_source_cg_override_wins_universal() {
let source = DfeSource::new("syslog");
assert_eq!(
source
.consumer_group(
"loader",
ServiceRole::Universal,
None,
Some("custom-loader-cg")
)
.unwrap(),
"custom-loader-cg"
);
}
#[test]
fn dfe_source_from_topic_land() {
assert_eq!(DfeSource::source_from_topic("syslog_land"), Some("syslog"));
assert_eq!(DfeSource::source_from_topic("auth_land"), Some("auth"));
}
#[test]
fn dfe_source_from_topic_load() {
assert_eq!(DfeSource::source_from_topic("syslog_load"), Some("syslog"));
assert_eq!(
DfeSource::source_from_topic("netflow_load"),
Some("netflow")
);
}
#[test]
fn dfe_source_from_topic_unknown() {
assert_eq!(DfeSource::source_from_topic("unknown"), None);
assert_eq!(DfeSource::source_from_topic("events"), None);
assert_eq!(DfeSource::source_from_topic(""), None);
}
#[test]
fn dfe_source_from_topic_edge_cases() {
assert_eq!(DfeSource::source_from_topic("_land"), Some(""));
assert_eq!(DfeSource::source_from_topic("a_load"), Some("a"));
}
#[test]
fn config_from_file_unsupported_extension() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("kafka.toml");
std::fs::write(&path, "key = value\n").unwrap();
let result = config_from_file(&path);
assert!(matches!(
result,
Err(KafkaConfigError::UnsupportedFormat { .. })
));
}
}