use crate::error::{KlagError, Result};
use regex::Regex;
use serde::Deserialize;
use std::collections::HashMap;
use std::path::Path;
use std::time::Duration;
#[derive(Debug, Deserialize, Clone)]
pub struct Config {
pub exporter: ExporterConfig,
pub clusters: Vec<ClusterConfig>,
}
#[derive(Debug, Deserialize, Clone)]
pub struct ExporterConfig {
#[serde(with = "humantime_serde", default = "default_poll_interval")]
pub poll_interval: Duration,
#[serde(default = "default_http_port")]
pub http_port: u16,
#[serde(default = "default_http_host")]
pub http_host: String,
#[serde(default = "default_granularity")]
pub granularity: Granularity,
#[serde(default)]
pub timestamp_sampling: TimestampSamplingConfig,
#[serde(default)]
pub otel: OtelConfig,
#[serde(default)]
pub leadership: LeadershipConfig,
#[serde(default)]
pub performance: PerformanceConfig,
}
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum Granularity {
Topic,
Partition,
}
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum TimestampSamplingMode {
Message,
Rate,
}
#[derive(Debug, Deserialize, Clone)]
pub struct TimestampSamplingConfig {
#[serde(default = "default_true")]
pub enabled: bool,
#[serde(default = "default_timestamp_sampling_mode")]
pub mode: TimestampSamplingMode,
#[serde(with = "humantime_serde", default = "default_cache_ttl")]
pub cache_ttl: Duration,
#[serde(default = "default_max_concurrent_fetches")]
pub max_concurrent_fetches: usize,
#[serde(default = "default_rate_history_samples")]
pub rate_history_samples: usize,
#[serde(with = "humantime_serde", default = "default_rate_history_max_age")]
pub rate_history_max_age: Duration,
#[serde(default = "default_rate_min_msgs_per_sec")]
pub rate_min_msgs_per_sec: f64,
}
#[derive(Debug, Deserialize, Clone)]
pub struct PerformanceConfig {
#[serde(with = "humantime_serde", default = "default_kafka_timeout")]
pub kafka_timeout: Duration,
#[serde(with = "humantime_serde", default = "default_offset_fetch_timeout")]
pub offset_fetch_timeout: Duration,
#[serde(default = "default_max_concurrent_groups")]
pub max_concurrent_groups: usize,
#[serde(default = "default_max_concurrent_watermarks")]
pub max_concurrent_watermarks: usize,
#[serde(default = "default_client_recycle_interval")]
pub client_recycle_interval: u64,
#[serde(default = "default_max_blocking_threads")]
pub max_blocking_threads: usize,
#[serde(
with = "humantime_serde",
default = "default_compacted_topics_cache_ttl"
)]
pub compacted_topics_cache_ttl: Duration,
#[serde(with = "humantime_serde", default = "default_metadata_cache_ttl")]
pub metadata_cache_ttl: Duration,
#[serde(
with = "humantime_serde",
default = "default_consumer_groups_cache_ttl"
)]
pub consumer_groups_cache_ttl: Duration,
}
#[derive(Debug, Deserialize, Clone)]
pub struct OtelConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_otel_endpoint")]
pub endpoint: String,
#[serde(with = "humantime_serde", default = "default_export_interval")]
pub export_interval: Duration,
}
#[derive(Debug, Deserialize, Clone)]
pub struct LeadershipConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_leadership_provider")]
pub provider: LeadershipProvider,
#[serde(default = "default_lease_name")]
pub lease_name: String,
#[serde(default = "default_lease_namespace")]
pub lease_namespace: String,
#[allow(dead_code)] pub identity: Option<String>,
#[serde(default = "default_lease_duration")]
#[allow(dead_code)] pub lease_duration_secs: u32,
#[serde(default = "default_grace_period")]
#[allow(dead_code)] pub grace_period_secs: u32,
}
#[derive(Debug, Deserialize, Clone, Copy, PartialEq, Eq, Default)]
#[serde(rename_all = "lowercase")]
pub enum LeadershipProvider {
#[default]
Kubernetes,
}
#[derive(Debug, Deserialize, Clone)]
pub struct ClusterConfig {
pub name: String,
pub bootstrap_servers: String,
#[serde(default = "default_whitelist")]
pub group_whitelist: Vec<String>,
#[serde(default)]
pub group_blacklist: Vec<String>,
#[serde(default = "default_whitelist")]
pub topic_whitelist: Vec<String>,
#[serde(default = "default_topic_blacklist")]
pub topic_blacklist: Vec<String>,
#[serde(default)]
pub consumer_properties: HashMap<String, String>,
#[serde(default)]
pub labels: HashMap<String, String>,
}
fn default_poll_interval() -> Duration {
Duration::from_secs(30)
}
fn default_http_port() -> u16 {
8000
}
fn default_http_host() -> String {
"0.0.0.0".to_string()
}
fn default_granularity() -> Granularity {
Granularity::Topic
}
fn default_true() -> bool {
true
}
fn default_cache_ttl() -> Duration {
Duration::from_secs(60)
}
fn default_max_concurrent_fetches() -> usize {
5
}
fn default_timestamp_sampling_mode() -> TimestampSamplingMode {
TimestampSamplingMode::Rate
}
fn default_rate_history_samples() -> usize {
5
}
fn default_rate_history_max_age() -> Duration {
Duration::from_secs(600)
}
fn default_rate_min_msgs_per_sec() -> f64 {
0.01
}
fn default_kafka_timeout() -> Duration {
Duration::from_secs(30)
}
fn default_offset_fetch_timeout() -> Duration {
Duration::from_secs(10)
}
fn default_max_concurrent_groups() -> usize {
10
}
fn default_max_concurrent_watermarks() -> usize {
50
}
fn default_client_recycle_interval() -> u64 {
50
}
fn default_max_blocking_threads() -> usize {
64
}
fn default_compacted_topics_cache_ttl() -> Duration {
Duration::from_secs(3600)
}
fn default_metadata_cache_ttl() -> Duration {
Duration::from_secs(300)
}
fn default_consumer_groups_cache_ttl() -> Duration {
Duration::from_secs(60)
}
fn default_otel_endpoint() -> String {
"http://localhost:4317".to_string()
}
fn default_export_interval() -> Duration {
Duration::from_secs(60)
}
fn default_leadership_provider() -> LeadershipProvider {
LeadershipProvider::Kubernetes
}
fn default_lease_name() -> String {
"klag-exporter".to_string()
}
fn default_lease_namespace() -> String {
"default".to_string()
}
fn default_lease_duration() -> u32 {
15
}
fn default_grace_period() -> u32 {
5
}
fn default_whitelist() -> Vec<String> {
vec![".*".to_string()]
}
fn default_topic_blacklist() -> Vec<String> {
vec!["__.*".to_string()]
}
impl Default for TimestampSamplingConfig {
fn default() -> Self {
Self {
enabled: true,
mode: default_timestamp_sampling_mode(),
cache_ttl: default_cache_ttl(),
max_concurrent_fetches: default_max_concurrent_fetches(),
rate_history_samples: default_rate_history_samples(),
rate_history_max_age: default_rate_history_max_age(),
rate_min_msgs_per_sec: default_rate_min_msgs_per_sec(),
}
}
}
impl Default for OtelConfig {
fn default() -> Self {
Self {
enabled: false,
endpoint: default_otel_endpoint(),
export_interval: default_export_interval(),
}
}
}
impl Default for LeadershipConfig {
fn default() -> Self {
Self {
enabled: false,
provider: default_leadership_provider(),
lease_name: default_lease_name(),
lease_namespace: default_lease_namespace(),
identity: None,
lease_duration_secs: default_lease_duration(),
grace_period_secs: default_grace_period(),
}
}
}
impl Default for PerformanceConfig {
fn default() -> Self {
Self {
kafka_timeout: default_kafka_timeout(),
offset_fetch_timeout: default_offset_fetch_timeout(),
max_concurrent_groups: default_max_concurrent_groups(),
max_concurrent_watermarks: default_max_concurrent_watermarks(),
client_recycle_interval: default_client_recycle_interval(),
max_blocking_threads: default_max_blocking_threads(),
compacted_topics_cache_ttl: default_compacted_topics_cache_ttl(),
metadata_cache_ttl: default_metadata_cache_ttl(),
consumer_groups_cache_ttl: default_consumer_groups_cache_ttl(),
}
}
}
impl Config {
pub fn load(path: Option<&str>) -> Result<Config> {
let config_path = path.unwrap_or("config.toml");
if !Path::new(config_path).exists() {
return Err(KlagError::Config(format!(
"Configuration file not found: {}",
config_path
)));
}
let content = std::fs::read_to_string(config_path)?;
let content = Self::substitute_env_vars(&content);
let config: Config = toml::from_str(&content)
.map_err(|e| KlagError::Config(format!("TOML parse error: {}", e)))?;
config.validate()?;
Ok(config)
}
fn substitute_env_vars(content: &str) -> String {
let re = Regex::new(r"\$\{\??([^}:-]+)(?::-([^}]*))?\}").unwrap();
re.replace_all(content, |caps: ®ex::Captures| {
let var_name = &caps[1];
let default_value = caps.get(2).map(|m| m.as_str()).unwrap_or("");
std::env::var(var_name).unwrap_or_else(|_| default_value.to_string())
})
.to_string()
}
pub fn validate(&self) -> Result<()> {
if self.clusters.is_empty() {
return Err(KlagError::Config(
"At least one cluster must be configured".to_string(),
));
}
for cluster in &self.clusters {
cluster.validate()?;
}
if self.exporter.performance.max_concurrent_groups == 0 {
return Err(KlagError::Config(
"performance.max_concurrent_groups must be at least 1".to_string(),
));
}
if self.exporter.performance.max_concurrent_watermarks == 0 {
return Err(KlagError::Config(
"performance.max_concurrent_watermarks must be at least 1".to_string(),
));
}
let ts = &self.exporter.timestamp_sampling;
if ts.enabled && ts.mode == TimestampSamplingMode::Message && ts.max_concurrent_fetches == 0
{
return Err(KlagError::Config(
"timestamp_sampling.max_concurrent_fetches must be >= 1 when mode = 'message'"
.to_string(),
));
}
if ts.enabled && ts.mode == TimestampSamplingMode::Rate {
if ts.rate_history_samples < 2 {
return Err(KlagError::Config(
"timestamp_sampling.rate_history_samples must be >= 2 (need two samples \
to compute a rate)"
.to_string(),
));
}
if !ts.rate_min_msgs_per_sec.is_finite() || ts.rate_min_msgs_per_sec < 0.0 {
return Err(KlagError::Config(format!(
"timestamp_sampling.rate_min_msgs_per_sec ({}) must be finite and >= 0",
ts.rate_min_msgs_per_sec
)));
}
}
let sampler_uses_blocking_threads = self.exporter.timestamp_sampling.enabled
&& matches!(
self.exporter.timestamp_sampling.mode,
TimestampSamplingMode::Message
);
let sampler_contribution = if sampler_uses_blocking_threads {
self.exporter.timestamp_sampling.max_concurrent_fetches
} else {
0
};
let min_needed = self.exporter.performance.max_concurrent_groups + sampler_contribution + 4; if self.exporter.performance.max_blocking_threads < min_needed {
let sampling_state = match (
self.exporter.timestamp_sampling.enabled,
self.exporter.timestamp_sampling.mode,
) {
(false, _) => "disabled",
(true, TimestampSamplingMode::Rate) => "rate (no FFI)",
(true, TimestampSamplingMode::Message) => "message",
};
return Err(KlagError::Config(format!(
"performance.max_blocking_threads ({}) must be >= max_concurrent_groups ({}) + \
timestamp_sampling.max_concurrent_fetches ({}, sampling {}) + 4 = {}",
self.exporter.performance.max_blocking_threads,
self.exporter.performance.max_concurrent_groups,
sampler_contribution,
sampling_state,
min_needed,
)));
}
Ok(())
}
}
impl ClusterConfig {
pub fn validate(&self) -> Result<()> {
if self.name.is_empty() {
return Err(KlagError::Config(
"Cluster name cannot be empty".to_string(),
));
}
if self.bootstrap_servers.is_empty() {
return Err(KlagError::Config(format!(
"Cluster '{}': bootstrap_servers cannot be empty",
self.name
)));
}
self.compile_filters()?;
Ok(())
}
pub fn compile_filters(&self) -> Result<CompiledFilters> {
let group_whitelist = self
.group_whitelist
.iter()
.map(|p| Regex::new(p))
.collect::<std::result::Result<Vec<_>, _>>()?;
let group_blacklist = self
.group_blacklist
.iter()
.map(|p| Regex::new(p))
.collect::<std::result::Result<Vec<_>, _>>()?;
let topic_whitelist = self
.topic_whitelist
.iter()
.map(|p| Regex::new(p))
.collect::<std::result::Result<Vec<_>, _>>()?;
let topic_blacklist = self
.topic_blacklist
.iter()
.map(|p| Regex::new(p))
.collect::<std::result::Result<Vec<_>, _>>()?;
Ok(CompiledFilters {
group_whitelist,
group_blacklist,
topic_whitelist,
topic_blacklist,
})
}
}
#[derive(Debug, Clone)]
pub struct CompiledFilters {
pub group_whitelist: Vec<Regex>,
pub group_blacklist: Vec<Regex>,
pub topic_whitelist: Vec<Regex>,
pub topic_blacklist: Vec<Regex>,
}
impl CompiledFilters {
pub fn matches_group(&self, group: &str) -> bool {
let matches_whitelist = self.group_whitelist.iter().any(|r| r.is_match(group));
let matches_blacklist = self.group_blacklist.iter().any(|r| r.is_match(group));
matches_whitelist && !matches_blacklist
}
pub fn matches_topic(&self, topic: &str) -> bool {
let matches_whitelist = self.topic_whitelist.iter().any(|r| r.is_match(topic));
let matches_blacklist = self.topic_blacklist.iter().any(|r| r.is_match(topic));
matches_whitelist && !matches_blacklist
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_config_loads_from_file() {
let config_content = r#"
[exporter]
poll_interval = "30s"
http_port = 8000
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let config = Config::load(Some(file.path().to_str().unwrap())).unwrap();
assert_eq!(config.exporter.poll_interval, Duration::from_secs(30));
assert_eq!(config.exporter.http_port, 8000);
assert_eq!(config.clusters.len(), 1);
assert_eq!(config.clusters[0].name, "test");
}
#[test]
fn test_config_env_override() {
std::env::set_var("TEST_KAFKA_USER", "myuser");
let config_content = r#"
[exporter]
poll_interval = "30s"
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
[clusters.consumer_properties]
"sasl.username" = "${TEST_KAFKA_USER}"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let config = Config::load(Some(file.path().to_str().unwrap())).unwrap();
assert_eq!(
config.clusters[0].consumer_properties.get("sasl.username"),
Some(&"myuser".to_string())
);
std::env::remove_var("TEST_KAFKA_USER");
}
#[test]
fn test_config_env_with_default() {
std::env::remove_var("TEST_NONEXISTENT_VAR");
let config_content = r#"
[exporter]
poll_interval = "30s"
[[clusters]]
name = "test"
bootstrap_servers = "${TEST_NONEXISTENT_VAR:-localhost:9092}"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let config = Config::load(Some(file.path().to_str().unwrap())).unwrap();
assert_eq!(config.clusters[0].bootstrap_servers, "localhost:9092");
}
#[test]
fn test_config_env_override_default() {
std::env::set_var("TEST_BOOTSTRAP", "kafka:29092");
let config_content = r#"
[exporter]
poll_interval = "30s"
[[clusters]]
name = "test"
bootstrap_servers = "${TEST_BOOTSTRAP:-localhost:9092}"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let config = Config::load(Some(file.path().to_str().unwrap())).unwrap();
assert_eq!(config.clusters[0].bootstrap_servers, "kafka:29092");
std::env::remove_var("TEST_BOOTSTRAP");
}
#[test]
fn test_config_validates_bootstrap_servers() {
let config_content = r#"
[exporter]
poll_interval = "30s"
[[clusters]]
name = "test"
bootstrap_servers = ""
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let result = Config::load(Some(file.path().to_str().unwrap()));
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("bootstrap_servers cannot be empty"));
}
#[test]
fn test_regex_filter_whitelist_match() {
let filters = CompiledFilters {
group_whitelist: vec![Regex::new("^my-group.*").unwrap()],
group_blacklist: vec![],
topic_whitelist: vec![Regex::new(".*").unwrap()],
topic_blacklist: vec![],
};
assert!(filters.matches_group("my-group-1"));
assert!(filters.matches_group("my-group-2"));
assert!(!filters.matches_group("other-group"));
}
#[test]
fn test_regex_filter_blacklist_reject() {
let filters = CompiledFilters {
group_whitelist: vec![Regex::new(".*").unwrap()],
group_blacklist: vec![Regex::new("^internal-.*").unwrap()],
topic_whitelist: vec![Regex::new(".*").unwrap()],
topic_blacklist: vec![Regex::new("^__.*").unwrap()],
};
assert!(filters.matches_group("my-group"));
assert!(!filters.matches_group("internal-group"));
assert!(filters.matches_topic("my-topic"));
assert!(!filters.matches_topic("__consumer_offsets"));
}
#[test]
fn test_default_config_values() {
let config_content = r#"
[exporter]
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let config = Config::load(Some(file.path().to_str().unwrap())).unwrap();
assert_eq!(config.exporter.poll_interval, Duration::from_secs(30));
assert_eq!(config.exporter.http_port, 8000);
assert_eq!(config.exporter.http_host, "0.0.0.0");
assert_eq!(config.exporter.granularity, Granularity::Topic);
assert!(config.exporter.timestamp_sampling.enabled);
assert_eq!(
config.exporter.timestamp_sampling.mode,
TimestampSamplingMode::Rate,
"default mode should be rate (Tier 3)"
);
assert!(!config.exporter.otel.enabled);
assert_eq!(
config.exporter.performance.kafka_timeout,
Duration::from_secs(30)
);
assert_eq!(
config.exporter.performance.offset_fetch_timeout,
Duration::from_secs(10)
);
assert_eq!(config.exporter.performance.max_concurrent_groups, 10);
assert_eq!(config.exporter.performance.max_concurrent_watermarks, 50);
assert_eq!(config.exporter.performance.client_recycle_interval, 50);
assert_eq!(config.exporter.performance.max_blocking_threads, 64);
assert_eq!(
config.exporter.performance.compacted_topics_cache_ttl,
Duration::from_secs(3600)
);
assert_eq!(
config.exporter.performance.metadata_cache_ttl,
Duration::from_secs(300)
);
assert_eq!(
config.exporter.performance.consumer_groups_cache_ttl,
Duration::from_secs(60)
);
}
#[test]
fn test_performance_config_custom_values() {
let config_content = r#"
[exporter]
poll_interval = "60s"
[exporter.performance]
kafka_timeout = "15s"
offset_fetch_timeout = "5s"
max_concurrent_groups = 20
max_concurrent_watermarks = 100
client_recycle_interval = 0
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let config = Config::load(Some(file.path().to_str().unwrap())).unwrap();
assert_eq!(
config.exporter.performance.kafka_timeout,
Duration::from_secs(15)
);
assert_eq!(
config.exporter.performance.offset_fetch_timeout,
Duration::from_secs(5)
);
assert_eq!(config.exporter.performance.max_concurrent_groups, 20);
assert_eq!(config.exporter.performance.max_concurrent_watermarks, 100);
assert_eq!(config.exporter.performance.client_recycle_interval, 0);
}
#[test]
fn test_max_blocking_threads_rejected_when_too_small() {
let config_content = r#"
[exporter]
[exporter.timestamp_sampling]
mode = "message"
[exporter.performance]
max_blocking_threads = 16
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let err = Config::load(Some(file.path().to_str().unwrap())).unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("max_blocking_threads (16)"),
"unexpected error: {msg}"
);
assert!(msg.contains("= 19"), "unexpected error: {msg}");
}
#[test]
fn test_message_mode_rejects_zero_concurrent_fetches() {
let config_content = r#"
[exporter]
[exporter.timestamp_sampling]
mode = "message"
max_concurrent_fetches = 0
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let err = Config::load(Some(file.path().to_str().unwrap())).unwrap_err();
assert!(
err.to_string()
.contains("max_concurrent_fetches must be >= 1"),
"unexpected error: {err}"
);
}
#[test]
fn test_rate_mode_rejects_one_history_sample() {
let config_content = r#"
[exporter]
[exporter.timestamp_sampling]
rate_history_samples = 1
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let err = Config::load(Some(file.path().to_str().unwrap())).unwrap_err();
assert!(
err.to_string()
.contains("rate_history_samples must be >= 2"),
"unexpected error: {err}"
);
}
#[test]
fn test_rate_mode_rejects_negative_min_rate() {
let config_content = r#"
[exporter]
[exporter.timestamp_sampling]
rate_min_msgs_per_sec = -1.0
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let err = Config::load(Some(file.path().to_str().unwrap())).unwrap_err();
assert!(
err.to_string().contains("rate_min_msgs_per_sec")
&& err.to_string().contains("must be finite and >= 0"),
"unexpected error: {err}"
);
}
#[test]
fn test_max_blocking_threads_sampler_excluded_when_disabled() {
let config_content = r#"
[exporter]
[exporter.timestamp_sampling]
enabled = false
[exporter.performance]
max_blocking_threads = 16
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let config = Config::load(Some(file.path().to_str().unwrap()))
.expect("should accept 16 when sampling is disabled");
assert_eq!(config.exporter.performance.max_blocking_threads, 16);
}
#[test]
fn test_max_blocking_threads_custom_value_accepted() {
let config_content = r#"
[exporter]
[exporter.performance]
max_concurrent_groups = 30
max_blocking_threads = 128
[exporter.timestamp_sampling]
max_concurrent_fetches = 20
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let config = Config::load(Some(file.path().to_str().unwrap())).unwrap();
assert_eq!(config.exporter.performance.max_blocking_threads, 128);
}
#[test]
fn test_performance_config_validates_zero_concurrency() {
let config_content = r#"
[exporter]
poll_interval = "30s"
[exporter.performance]
max_concurrent_groups = 0
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let result = Config::load(Some(file.path().to_str().unwrap()));
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("max_concurrent_groups must be at least 1"));
}
#[test]
fn test_performance_config_validates_zero_watermarks_concurrency() {
let config_content = r#"
[exporter]
poll_interval = "30s"
[exporter.performance]
max_concurrent_watermarks = 0
[[clusters]]
name = "test"
bootstrap_servers = "localhost:9092"
"#;
let mut file = NamedTempFile::new().unwrap();
file.write_all(config_content.as_bytes()).unwrap();
let result = Config::load(Some(file.path().to_str().unwrap()));
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(err.contains("max_concurrent_watermarks must be at least 1"));
}
}