use std::collections::BTreeMap;
use regex::Regex;
pub(crate) const KEY_METRICS: &str = "metrics";
pub(crate) const KEY_INTERVAL_MS: &str = "interval.ms";
pub(crate) const KEY_MATCH: &str = "match";
pub(crate) const DEFAULT_INTERVAL_MS: i32 = 300_000;
pub(crate) const MIN_INTERVAL_MS: i64 = 100;
pub(crate) const MAX_INTERVAL_MS: i64 = 3_600_000;
pub(crate) const ALL_METRICS: &str = "*";
#[allow(clippy::enum_variant_names)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum MatchSelector {
ClientInstanceId,
ClientId,
ClientSoftwareName,
ClientSoftwareVersion,
ClientSourceAddress,
ClientSourcePort,
}
impl MatchSelector {
fn parse(s: &str) -> Option<Self> {
Some(match s {
"client_instance_id" => Self::ClientInstanceId,
"client_id" => Self::ClientId,
"client_software_name" => Self::ClientSoftwareName,
"client_software_version" => Self::ClientSoftwareVersion,
"client_source_address" => Self::ClientSourceAddress,
"client_source_port" => Self::ClientSourcePort,
_ => return None,
})
}
}
#[derive(Debug, Clone)]
pub(crate) struct MatchRule {
pub selector: MatchSelector,
pub pattern: Regex,
}
pub(crate) fn validate(key: &str, value: &str) -> Result<(), String> {
match key {
KEY_METRICS => Ok(()),
KEY_INTERVAL_MS => {
let n: i64 = value
.parse()
.map_err(|_| format!("interval.ms must be an integer, got `{value}`"))?;
if (MIN_INTERVAL_MS..=MAX_INTERVAL_MS).contains(&n) {
Ok(())
} else {
Err(format!(
"interval.ms must be in [{MIN_INTERVAL_MS}, {MAX_INTERVAL_MS}], got {n}"
))
}
}
KEY_MATCH => parse_match_rules(value).map(|_| ()),
other => Err(format!("unknown client-metrics config key `{other}`")),
}
}
pub(crate) fn is_recognized(key: &str) -> bool {
matches!(key, KEY_METRICS | KEY_INTERVAL_MS | KEY_MATCH)
}
pub(crate) fn effective_interval_ms(configs: &BTreeMap<String, String>) -> i32 {
configs
.get(KEY_INTERVAL_MS)
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(DEFAULT_INTERVAL_MS)
}
pub(crate) fn parse_metrics(value: &str) -> Vec<String> {
if value.trim().is_empty() {
return Vec::new();
}
value
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
}
pub(crate) fn parse_match_rules(value: &str) -> Result<Vec<MatchRule>, String> {
if value.trim().is_empty() {
return Ok(Vec::new());
}
let mut rules = Vec::new();
for entry in value.split(',') {
let entry = entry.trim();
if entry.is_empty() {
continue;
}
let (sel, pat) = entry
.split_once('=')
.ok_or_else(|| format!("match entry `{entry}` is not `selector=regex`"))?;
let selector = MatchSelector::parse(sel.trim())
.ok_or_else(|| format!("unknown match selector `{}`", sel.trim()))?;
let pattern = Regex::new(pat.trim())
.map_err(|e| format!("invalid regex for `{}`: {e}", sel.trim()))?;
rules.push(MatchRule { selector, pattern });
}
Ok(rules)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn interval_bounds_enforced() {
assert!(validate("interval.ms", "300000").is_ok());
assert!(validate("interval.ms", "100").is_ok());
assert!(validate("interval.ms", "3600000").is_ok());
assert!(validate("interval.ms", "99").is_err());
assert!(validate("interval.ms", "3600001").is_err());
assert!(validate("interval.ms", "not-a-number").is_err());
}
#[test]
fn unknown_key_rejected() {
assert!(validate("bogus.key", "x").is_err());
}
#[test]
fn match_selectors_validated() {
assert!(validate("match", "client_id=my-app.*").is_ok());
assert!(
validate(
"match",
"client_software_name=apache-kafka-java,client_id=svc-.*"
)
.is_ok()
);
assert!(validate("match", "client_foo=x").is_err());
assert!(validate("match", "client_id").is_err());
assert!(validate("match", "client_id=[unclosed").is_err());
}
#[test]
fn metrics_list_accepts_star_and_prefixes() {
assert!(validate("metrics", "*").is_ok());
assert!(
validate(
"metrics",
"org.apache.kafka.consumer.,org.apache.kafka.producer."
)
.is_ok()
);
assert!(validate("metrics", "").is_ok());
}
#[test]
fn effective_interval_defaults_and_clamps() {
let mut m = std::collections::BTreeMap::new();
assert_eq!(effective_interval_ms(&m), DEFAULT_INTERVAL_MS);
m.insert("interval.ms".to_string(), "60000".to_string());
assert_eq!(effective_interval_ms(&m), 60_000);
}
#[test]
fn parse_match_rules_roundtrip() {
let rules = parse_match_rules("client_id=svc-.*,client_software_name=java").unwrap();
assert_eq!(rules.len(), 2);
assert!(rules.iter().any(|r| r.selector == MatchSelector::ClientId));
}
#[test]
fn parse_metrics_collapses_star() {
assert_eq!(parse_metrics("*"), vec!["*".to_string()]);
assert_eq!(parse_metrics(""), Vec::<String>::new());
assert_eq!(
parse_metrics("a.,b."),
vec!["a.".to_string(), "b.".to_string()]
);
}
}