use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap};
use std::str::FromStr;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SelfRegulationProfile {
#[default]
Throughput,
Balanced,
LowLatency,
}
impl SelfRegulationProfile {
#[must_use]
pub fn consumer_defaults(self) -> ConsumerKnobs {
match self {
Self::Throughput => ConsumerKnobs {
fetch_min_bytes: Some(1_048_576),
fetch_max_wait_ms: Some(50),
max_partition_fetch_bytes: Some(10_485_760),
fetch_max_bytes: Some(104_857_600),
max_poll_records: Some(2000),
},
Self::Balanced => ConsumerKnobs {
fetch_min_bytes: Some(262_144), fetch_max_wait_ms: Some(25),
max_partition_fetch_bytes: Some(5_242_880), fetch_max_bytes: Some(52_428_800), max_poll_records: Some(1000),
},
Self::LowLatency => ConsumerKnobs {
fetch_min_bytes: Some(1), fetch_max_wait_ms: Some(5), max_partition_fetch_bytes: Some(1_048_576), fetch_max_bytes: Some(10_485_760), max_poll_records: Some(500),
},
}
}
#[must_use]
pub fn producer_defaults(self) -> ProducerKnobs {
match self {
Self::Throughput => ProducerKnobs {
batch_size_bytes: Some(131_072),
linger_ms: Some(20),
compression_type: Some("lz4".to_string()),
buffer_memory_bytes: Some(67_108_864),
max_in_flight: Some(5),
},
Self::Balanced => ProducerKnobs {
batch_size_bytes: Some(65_536), linger_ms: Some(5),
compression_type: Some("lz4".to_string()),
buffer_memory_bytes: Some(33_554_432), max_in_flight: Some(5),
},
Self::LowLatency => ProducerKnobs {
batch_size_bytes: Some(16_384), linger_ms: Some(0), compression_type: Some("lz4".to_string()),
buffer_memory_bytes: Some(16_777_216), max_in_flight: Some(5),
},
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ConsumerKnobs {
#[serde(default)]
pub fetch_min_bytes: Option<i32>,
#[serde(default)]
pub fetch_max_wait_ms: Option<u32>,
#[serde(default)]
pub max_partition_fetch_bytes: Option<i32>,
#[serde(default)]
pub fetch_max_bytes: Option<i32>,
#[serde(default)]
pub max_poll_records: Option<usize>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ProducerKnobs {
#[serde(default)]
pub batch_size_bytes: Option<i32>,
#[serde(default)]
pub linger_ms: Option<u32>,
#[serde(default)]
pub compression_type: Option<String>,
#[serde(default)]
pub buffer_memory_bytes: Option<u64>,
#[serde(default)]
pub max_in_flight: Option<u32>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct KafkaSizingConfig {
#[serde(default)]
pub profile: SelfRegulationProfile,
#[serde(default)]
pub consumer: ConsumerKnobs,
#[serde(default)]
pub producer: ProducerKnobs,
#[serde(default)]
pub consumer_librdkafka: BTreeMap<String, String>,
#[serde(default)]
pub producer_librdkafka: BTreeMap<String, String>,
}
const GOVERNOR_CONSUMER_KEYS: &[&str] = &[
"fetch.min.bytes",
"fetch.max.bytes",
"fetch.wait.max.ms",
"max.partition.fetch.bytes",
"fetch.message.max.bytes",
"enable.auto.commit",
];
const GOVERNOR_PRODUCER_KEYS: &[&str] = &[
"batch.size",
"linger.ms",
"queue.buffering.max.ms",
"compression.type",
"compression.codec",
"queue.buffering.max.kbytes",
"max.in.flight.requests.per.connection",
"partitioner",
"sticky.partitioning.linger.ms",
];
impl KafkaSizingConfig {
#[must_use]
pub fn resolved_consumer_map(&self) -> BTreeMap<String, String> {
let profile_knobs = self.profile.consumer_defaults();
let fetch_min_bytes = self
.consumer
.fetch_min_bytes
.or(profile_knobs.fetch_min_bytes)
.unwrap_or(1);
let fetch_max_wait_ms = self
.consumer
.fetch_max_wait_ms
.or(profile_knobs.fetch_max_wait_ms)
.unwrap_or(500);
let max_partition_fetch_bytes = self
.consumer
.max_partition_fetch_bytes
.or(profile_knobs.max_partition_fetch_bytes)
.unwrap_or(1_048_576);
let fetch_max_bytes = self
.consumer
.fetch_max_bytes
.or(profile_knobs.fetch_max_bytes)
.unwrap_or(52_428_800);
let mut map = BTreeMap::new();
map.insert("fetch.min.bytes".to_string(), fetch_min_bytes.to_string());
map.insert(
"fetch.wait.max.ms".to_string(),
fetch_max_wait_ms.to_string(),
);
map.insert(
"max.partition.fetch.bytes".to_string(),
max_partition_fetch_bytes.to_string(),
);
map.insert("fetch.max.bytes".to_string(), fetch_max_bytes.to_string());
for (k, v) in &self.consumer_librdkafka {
if GOVERNOR_CONSUMER_KEYS.contains(&k.as_str()) {
tracing::warn!(
key = k.as_str(),
value = v.as_str(),
"kafka sizing: raw consumer_librdkafka overrides a governor key"
);
}
map.insert(k.clone(), v.clone());
}
map
}
#[must_use]
pub fn resolved_producer_map(&self) -> BTreeMap<String, String> {
let profile_knobs = self.profile.producer_defaults();
let batch_size_bytes = self
.producer
.batch_size_bytes
.or(profile_knobs.batch_size_bytes)
.unwrap_or(1_000_000);
let linger_ms = self
.producer
.linger_ms
.or(profile_knobs.linger_ms)
.unwrap_or(5);
let compression_type = self
.producer
.compression_type
.clone()
.or(profile_knobs.compression_type)
.unwrap_or_else(|| "none".to_string());
let buffer_memory_bytes = self
.producer
.buffer_memory_bytes
.or(profile_knobs.buffer_memory_bytes)
.unwrap_or(1_073_741_824); let max_in_flight = self
.producer
.max_in_flight
.or(profile_knobs.max_in_flight)
.unwrap_or(1_000_000);
let buffer_kib = (buffer_memory_bytes / 1024).max(1);
let mut map = BTreeMap::new();
map.insert("batch.size".to_string(), batch_size_bytes.to_string());
map.insert("linger.ms".to_string(), linger_ms.to_string());
map.insert("compression.type".to_string(), compression_type);
map.insert(
"queue.buffering.max.kbytes".to_string(),
buffer_kib.to_string(),
);
map.insert(
"max.in.flight.requests.per.connection".to_string(),
max_in_flight.to_string(),
);
map.insert(
"sticky.partitioning.linger.ms".to_string(),
linger_ms.to_string(),
);
for (k, v) in &self.producer_librdkafka {
if GOVERNOR_PRODUCER_KEYS.contains(&k.as_str()) {
tracing::warn!(
key = k.as_str(),
value = v.as_str(),
"kafka sizing: raw producer_librdkafka overrides a governor key"
);
}
map.insert(k.clone(), v.clone());
}
map
}
#[must_use]
pub fn effective_poll_cap(&self) -> usize {
self.consumer
.max_poll_records
.or(self.profile.consumer_defaults().max_poll_records)
.unwrap_or(10_000)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SuppressionRule {
pub preferred_suffix: String,
pub suppressed_suffix: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum KafkaProfile {
#[default]
Production,
DevTest,
}
impl FromStr for KafkaProfile {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"production" | "prod" => Ok(Self::Production),
"devtest" | "dev" | "test" | "development" => Ok(Self::DevTest),
_ => Err(format!(
"Unknown Kafka profile: {s}. Valid: production, devtest"
)),
}
}
}
impl std::fmt::Display for KafkaProfile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Production => write!(f, "production"),
Self::DevTest => write!(f, "devtest"),
}
}
}
#[must_use]
pub fn merge_with_overrides<S: std::hash::BuildHasher>(
profile: &[(&str, &str)],
overrides: &HashMap<String, String, S>,
) -> HashMap<String, String> {
let mut config = HashMap::with_capacity(profile.len() + overrides.len());
for (key, value) in profile {
config.insert((*key).to_string(), (*value).to_string());
}
for (key, value) in overrides {
config.insert(key.clone(), value.clone());
}
config
}
pub const PRODUCTION_PROFILE: &[(&str, &str)] = &[
("partition.assignment.strategy", "cooperative-sticky"),
("fetch.min.bytes", "1048576"),
("fetch.wait.max.ms", "100"),
("queued.min.messages", "20000"),
("enable.auto.commit", "false"),
("statistics.interval.ms", "1000"),
];
pub const DEVTEST_PROFILE: &[(&str, &str)] = &[
("partition.assignment.strategy", "cooperative-sticky"),
("queued.min.messages", "1000"),
("enable.auto.commit", "false"),
("reconnect.backoff.ms", "10"),
("reconnect.backoff.max.ms", "100"),
("log.connection.close", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_HIGH_THROUGHPUT: &[(&str, &str)] = &[
("linger.ms", "100"),
("compression.type", "zstd"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_EXACTLY_ONCE: &[(&str, &str)] = &[
("enable.idempotence", "true"),
("acks", "all"),
("max.in.flight.requests.per.connection", "5"),
("linger.ms", "20"),
("compression.type", "zstd"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_LOW_LATENCY: &[(&str, &str)] = &[
("acks", "1"),
("linger.ms", "0"),
("compression.type", "lz4"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_DEVTEST: &[(&str, &str)] = &[
("acks", "1"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
#[deprecated(since = "2.0.0", note = "Use PRODUCER_HIGH_THROUGHPUT instead")]
pub const PRODUCER_DEFAULTS: &[(&str, &str)] = PRODUCER_HIGH_THROUGHPUT;
#[deprecated(since = "2.0.0", note = "Use PRODUCTION_PROFILE instead")]
pub const HIGH_THROUGHPUT_CONSUMER_DEFAULTS: &[(&str, &str)] = PRODUCTION_PROFILE;
pub const LOW_LATENCY_CONSUMER_DEFAULTS: &[(&str, &str)] = &[
("partition.assignment.strategy", "cooperative-sticky"),
("fetch.wait.max.ms", "10"),
("queued.min.messages", "1000"),
("enable.auto.commit", "false"),
("reconnect.backoff.ms", "10"),
("reconnect.backoff.max.ms", "100"),
("statistics.interval.ms", "1000"),
];
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(clippy::struct_excessive_bools)] pub struct KafkaConfig {
#[serde(default)]
pub profile: KafkaProfile,
#[serde(default = "default_brokers")]
pub brokers: Vec<String>,
#[serde(default = "default_group")]
pub group: String,
#[serde(default = "default_client_id")]
pub client_id: String,
#[serde(default)]
pub topics: Vec<String>,
#[serde(default)]
pub auto_discover: bool,
#[serde(default)]
pub topic_include: Vec<String>,
#[serde(default = "default_topic_exclude")]
pub topic_exclude: Vec<String>,
#[serde(default = "default_topic_refresh_secs")]
pub topic_refresh_secs: u64,
#[serde(default = "default_topic_suppression_rules")]
pub topic_suppression_rules: Vec<SuppressionRule>,
#[serde(default = "default_security_protocol")]
pub security_protocol: String,
#[serde(default)]
pub sasl_mechanism: Option<String>,
#[serde(default)]
pub sasl_username: Option<String>,
#[serde(default)]
pub sasl_password: Option<crate::SensitiveString>,
#[serde(default)]
pub ssl_ca_location: Option<String>,
#[serde(default)]
pub ssl_certificate_location: Option<String>,
#[serde(default)]
pub ssl_key_location: Option<String>,
#[serde(default)]
pub ssl_skip_verify: bool,
#[serde(default)]
pub allow_insecure_transport: bool,
#[serde(default)]
pub enable_auto_commit: bool,
#[serde(default = "default_auto_commit_interval")]
pub auto_commit_interval_ms: u32,
#[serde(default = "default_session_timeout")]
pub session_timeout_ms: u32,
#[serde(default = "default_heartbeat_interval")]
pub heartbeat_interval_ms: u32,
#[serde(default = "default_max_poll_interval")]
pub max_poll_interval_ms: u32,
#[serde(default = "default_fetch_min_bytes")]
pub fetch_min_bytes: i32,
#[serde(default = "default_fetch_max_bytes")]
pub fetch_max_bytes: i32,
#[serde(default = "default_max_partition_fetch_bytes")]
pub max_partition_fetch_bytes: i32,
#[serde(default = "default_auto_offset_reset")]
pub auto_offset_reset: String,
#[serde(default)]
pub enable_partition_eof: bool,
#[serde(default)]
pub sizing: KafkaSizingConfig,
#[serde(default)]
pub librdkafka_overrides: HashMap<String, String>,
#[serde(default)]
#[deprecated(since = "1.3.0", note = "Use `librdkafka_overrides` instead")]
pub extra_config: HashMap<String, String>,
#[serde(default)]
pub filters_in: Vec<crate::transport::filter::FilterRule>,
#[serde(default)]
pub filters_out: Vec<crate::transport::filter::FilterRule>,
}
fn default_topic_exclude() -> Vec<String> {
vec!["^__".to_string()]
}
fn default_topic_refresh_secs() -> u64 {
60
}
fn default_topic_suppression_rules() -> Vec<SuppressionRule> {
vec![SuppressionRule {
preferred_suffix: "_load".into(),
suppressed_suffix: "_land".into(),
}]
}
fn default_brokers() -> Vec<String> {
vec!["localhost:9092".to_string()]
}
fn default_group() -> String {
"hyperi-rustlib-consumer".to_string()
}
fn default_client_id() -> String {
"hyperi-rustlib".to_string()
}
fn default_security_protocol() -> String {
"plaintext".to_string()
}
fn default_auto_commit_interval() -> u32 {
5000
}
fn default_session_timeout() -> u32 {
45000
}
fn default_heartbeat_interval() -> u32 {
3000
}
fn default_max_poll_interval() -> u32 {
300_000
}
fn default_fetch_min_bytes() -> i32 {
1
}
fn default_fetch_max_bytes() -> i32 {
52_428_800 }
fn default_max_partition_fetch_bytes() -> i32 {
1_048_576 }
fn default_auto_offset_reset() -> String {
"earliest".to_string()
}
impl Default for KafkaConfig {
fn default() -> Self {
#[allow(deprecated)]
Self {
profile: KafkaProfile::default(),
brokers: default_brokers(),
group: default_group(),
client_id: default_client_id(),
topics: Vec::new(),
auto_discover: false,
topic_include: Vec::new(),
topic_exclude: default_topic_exclude(),
topic_refresh_secs: default_topic_refresh_secs(),
topic_suppression_rules: default_topic_suppression_rules(),
security_protocol: default_security_protocol(),
sasl_mechanism: None,
sasl_username: None,
sasl_password: None,
ssl_ca_location: None,
ssl_certificate_location: None,
ssl_key_location: None,
ssl_skip_verify: false,
allow_insecure_transport: false,
enable_auto_commit: false,
auto_commit_interval_ms: default_auto_commit_interval(),
session_timeout_ms: default_session_timeout(),
heartbeat_interval_ms: default_heartbeat_interval(),
max_poll_interval_ms: default_max_poll_interval(),
fetch_min_bytes: default_fetch_min_bytes(),
fetch_max_bytes: default_fetch_max_bytes(),
max_partition_fetch_bytes: default_max_partition_fetch_bytes(),
auto_offset_reset: default_auto_offset_reset(),
enable_partition_eof: false,
sizing: KafkaSizingConfig::default(),
librdkafka_overrides: HashMap::new(),
extra_config: HashMap::new(),
filters_in: Vec::new(),
filters_out: Vec::new(),
}
}
}
impl KafkaConfig {
#[must_use]
pub fn production() -> Self {
Self {
profile: KafkaProfile::Production,
..Default::default()
}
}
#[must_use]
pub fn devtest() -> Self {
Self {
profile: KafkaProfile::DevTest,
ssl_skip_verify: true,
..Default::default()
}
}
#[must_use]
pub fn for_testing(brokers: &str, group: &str, topics: Vec<String>) -> Self {
Self {
profile: KafkaProfile::DevTest,
brokers: vec![brokers.to_string()],
group: group.to_string(),
topics,
ssl_skip_verify: true,
..Default::default()
}
}
#[must_use]
pub fn with_profile(mut self, profile: KafkaProfile) -> Self {
self.profile = profile;
if profile == KafkaProfile::DevTest {
self.ssl_skip_verify = true;
}
self
}
#[must_use]
pub fn profile_defaults(&self) -> &'static [(&'static str, &'static str)] {
match self.profile {
KafkaProfile::Production => PRODUCTION_PROFILE,
KafkaProfile::DevTest => DEVTEST_PROFILE,
}
}
#[must_use]
#[allow(deprecated)]
pub fn build_librdkafka_config(&self) -> HashMap<String, String> {
let mut config = HashMap::new();
for (key, value) in self.profile_defaults() {
config.insert((*key).to_string(), (*value).to_string());
}
for (key, value) in &self.extra_config {
config.insert(key.clone(), value.clone());
}
for (key, value) in &self.librdkafka_overrides {
config.insert(key.clone(), value.clone());
}
config
}
#[must_use]
pub fn with_override(mut self, key: &str, value: &str) -> Self {
self.librdkafka_overrides
.insert(key.to_string(), value.to_string());
self
}
#[must_use]
pub fn with_overrides(mut self, overrides: &[(&str, &str)]) -> Self {
for (key, value) in overrides {
self.librdkafka_overrides
.insert((*key).to_string(), (*value).to_string());
}
self
}
#[must_use]
pub fn with_scram(mut self, mechanism: &str, username: &str, password: &str) -> Self {
self.security_protocol = "sasl_plaintext".to_string();
self.sasl_mechanism = Some(mechanism.to_string());
self.sasl_username = Some(username.to_string());
self.sasl_password = Some(crate::SensitiveString::new(password));
self
}
#[must_use]
pub fn with_scram_ssl(mut self, mechanism: &str, username: &str, password: &str) -> Self {
self.security_protocol = "sasl_ssl".to_string();
self.sasl_mechanism = Some(mechanism.to_string());
self.sasl_username = Some(username.to_string());
self.sasl_password = Some(crate::SensitiveString::new(password));
self
}
#[must_use]
pub fn with_tls(mut self, ca_location: Option<&str>) -> Self {
if self.security_protocol == "plaintext" {
self.security_protocol = "ssl".to_string();
} else if self.security_protocol == "sasl_plaintext" {
self.security_protocol = "sasl_ssl".to_string();
}
self.ssl_ca_location = ca_location.map(String::from);
self
}
#[must_use]
pub fn with_client_cert(mut self, cert_location: &str, key_location: &str) -> Self {
self.ssl_certificate_location = Some(cert_location.to_string());
self.ssl_key_location = Some(key_location.to_string());
self
}
#[must_use]
pub fn with_ssl_skip_verify(mut self) -> Self {
self.ssl_skip_verify = true;
self
}
pub fn validate(&self, is_production: bool) -> Result<(), String> {
if !is_production {
return Ok(());
}
if self.ssl_skip_verify {
return Err(
"kafka: ssl_skip_verify (TLS verification disabled) is not permitted \
in production -- configure ssl_ca_location for private-CA trust instead"
.to_string(),
);
}
let proto = self.security_protocol.to_ascii_lowercase();
if !self.allow_insecure_transport && (proto == "plaintext" || proto == "sasl_plaintext") {
return Err(format!(
"kafka: security_protocol='{}' sends data/credentials unencrypted and is not \
permitted in production -- use 'ssl'/'sasl_ssl', or set \
allow_insecure_transport=true to deliberately opt in (e.g. mesh-encrypted \
in-cluster traffic)",
self.security_protocol
));
}
Ok(())
}
#[must_use]
pub fn with_ssl_insecure(mut self) -> Self {
if self.security_protocol == "plaintext" {
self.security_protocol = "ssl".to_string();
} else if self.security_protocol == "sasl_plaintext" {
self.security_protocol = "sasl_ssl".to_string();
}
self.ssl_skip_verify = true;
self
}
#[must_use]
#[deprecated(since = "2.0.0", note = "Use producer profile constants directly")]
#[allow(deprecated)]
pub fn with_producer_defaults(mut self) -> Self {
for (key, value) in PRODUCER_HIGH_THROUGHPUT {
self.extra_config
.entry((*key).to_string())
.or_insert_with(|| (*value).to_string());
}
self
}
#[must_use]
#[deprecated(since = "2.0.0", note = "Use KafkaConfig::production() instead")]
#[allow(deprecated)]
pub fn with_high_throughput(mut self) -> Self {
for (key, value) in PRODUCTION_PROFILE {
self.extra_config
.entry((*key).to_string())
.or_insert_with(|| (*value).to_string());
}
self
}
#[must_use]
#[deprecated(since = "2.0.0", note = "Use LOW_LATENCY_CONSUMER_DEFAULTS directly")]
#[allow(deprecated)]
pub fn with_low_latency(mut self) -> Self {
for (key, value) in LOW_LATENCY_CONSUMER_DEFAULTS {
self.extra_config
.entry((*key).to_string())
.or_insert_with(|| (*value).to_string());
}
self
}
#[must_use]
pub fn with_statistics(mut self, interval_ms: u32) -> Self {
self.librdkafka_overrides.insert(
"statistics.interval.ms".to_string(),
interval_ms.to_string(),
);
self
}
#[must_use]
pub fn with_cloud_connection_tuning(mut self) -> Self {
let cloud_settings = [
("socket.keepalive.enable", "true"),
("metadata.max.age.ms", "180000"),
("socket.connection.setup.timeout.ms", "30000"),
("connections.max.idle.ms", "540000"),
];
for (key, value) in cloud_settings {
self.librdkafka_overrides
.entry(key.to_string())
.or_insert_with(|| value.to_string());
}
self
}
#[cfg(feature = "config")]
#[must_use]
pub fn from_env(prefix: &str) -> Self {
use crate::config::env_compat::EnvVar;
let mut config = Self::default();
let prefixed = |name: &str, legacy: &[&str]| {
let mut var = EnvVar::new(&format!("{prefix}_{name}"));
for l in legacy {
var = var.with_legacy(&format!("{prefix}_{l}"));
}
var = var.with_legacy(&format!("KAFKA_{name}"));
var
};
if let Some(val) = prefixed("PROFILE", &[]).get()
&& let Ok(profile) = val.parse()
{
config.profile = profile;
if config.profile == KafkaProfile::DevTest {
config.ssl_skip_verify = true;
}
}
if let Some(brokers) = prefixed("BOOTSTRAP_SERVERS", &["BROKERS"]).get_list() {
config.brokers = brokers;
}
if let Some(val) = prefixed("GROUP_ID", &["GROUP", "CONSUMER_GROUP"]).get() {
config.group = val;
}
if let Some(val) = prefixed("CLIENT_ID", &[]).get() {
config.client_id = val;
}
if let Some(val) = prefixed("SECURITY_PROTOCOL", &[]).get() {
config.security_protocol = val;
}
if let Some(val) = prefixed("SASL_MECHANISM", &[]).get() {
config.sasl_mechanism = Some(val);
}
if let Some(val) = prefixed("SASL_USERNAME", &["SASL_USER"]).get() {
config.sasl_username = Some(val);
}
if let Some(val) = prefixed("SASL_PASSWORD", &[]).get() {
config.sasl_password = Some(crate::SensitiveString::from(val));
}
if let Some(val) = prefixed("SSL_CA_LOCATION", &["CA_CERT", "SSL_CA"]).get() {
config.ssl_ca_location = Some(val);
}
if let Some(val) = prefixed("SSL_SKIP_VERIFY", &["SSL_INSECURE", "INSECURE"]).get_bool() {
config.ssl_skip_verify = val;
}
if let Some(topics) = prefixed("TOPICS", &["TOPIC"]).get_list() {
config.topics = topics;
}
config
}
#[cfg(feature = "config")]
#[must_use]
pub fn from_env_standard() -> Self {
Self::from_env("KAFKA")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn validate_rejects_ssl_skip_verify_in_production() {
let dev = KafkaConfig::devtest();
assert!(dev.ssl_skip_verify);
assert!(dev.validate(false).is_ok(), "dev allows skip_verify");
assert!(
dev.validate(true).is_err(),
"production must reject ssl_skip_verify"
);
let prod = KafkaConfig {
security_protocol: "ssl".to_string(),
..Default::default()
};
assert!(!prod.ssl_skip_verify);
assert!(prod.validate(true).is_ok());
}
#[test]
fn validate_rejects_unencrypted_transport_in_production() {
let cfg = KafkaConfig::default();
assert_eq!(cfg.security_protocol, "plaintext");
assert!(cfg.validate(false).is_ok(), "dev allows plaintext");
assert!(
cfg.validate(true).is_err(),
"production must reject plaintext transport"
);
let sasl = KafkaConfig {
security_protocol: "sasl_plaintext".to_string(),
..Default::default()
};
assert!(sasl.validate(true).is_err());
let opted_in = KafkaConfig {
security_protocol: "plaintext".to_string(),
allow_insecure_transport: true,
..Default::default()
};
assert!(
opted_in.validate(true).is_ok(),
"allow_insecure_transport opts into plaintext in prod"
);
}
#[test]
fn kafka_config_topic_resolution_defaults() {
let config = KafkaConfig::default();
assert!(config.topic_include.is_empty());
assert_eq!(config.topic_exclude, vec!["^__".to_string()]);
assert!(!config.auto_discover);
assert_eq!(config.topic_refresh_secs, 60);
assert_eq!(config.topic_suppression_rules.len(), 1);
assert_eq!(config.topic_suppression_rules[0].preferred_suffix, "_load");
assert_eq!(config.topic_suppression_rules[0].suppressed_suffix, "_land");
}
fn sizing_for_profile(profile: SelfRegulationProfile) -> KafkaSizingConfig {
KafkaSizingConfig {
profile,
..Default::default()
}
}
#[test]
fn throughput_profile_consumer_knobs() {
let s = sizing_for_profile(SelfRegulationProfile::Throughput);
let map = s.resolved_consumer_map();
assert_eq!(map["fetch.min.bytes"], "1048576", "1 MiB fetch.min.bytes");
assert_eq!(map["fetch.wait.max.ms"], "50");
assert_eq!(
map["max.partition.fetch.bytes"], "10485760",
"10 MiB per-partition"
);
assert_eq!(map["fetch.max.bytes"], "104857600", "100 MiB total");
assert_eq!(
s.effective_poll_cap(),
2000,
"throughput poll-safety cap = 2000"
);
}
#[test]
fn low_latency_profile_consumer_knobs() {
let s = sizing_for_profile(SelfRegulationProfile::LowLatency);
let map = s.resolved_consumer_map();
assert_eq!(map["fetch.min.bytes"], "1", "no batching threshold");
assert_eq!(map["fetch.wait.max.ms"], "5", "return fast");
assert_eq!(map["max.partition.fetch.bytes"], "1048576", "1 MiB");
assert_eq!(map["fetch.max.bytes"], "10485760", "10 MiB total");
assert_eq!(s.effective_poll_cap(), 500);
}
#[test]
fn balanced_profile_consumer_knobs() {
let s = sizing_for_profile(SelfRegulationProfile::Balanced);
let map = s.resolved_consumer_map();
let fmb: i32 = map["fetch.min.bytes"].parse().unwrap();
let ll_min: i32 = 1;
let tp_min: i32 = 1_048_576;
assert!(
fmb > ll_min && fmb < tp_min,
"balanced fetch.min.bytes={fmb} should be between low_latency({ll_min}) and throughput({tp_min})"
);
assert_eq!(s.effective_poll_cap(), 1000);
}
#[test]
fn throughput_vs_low_latency_consumer_differ() {
let tp = sizing_for_profile(SelfRegulationProfile::Throughput).resolved_consumer_map();
let ll = sizing_for_profile(SelfRegulationProfile::LowLatency).resolved_consumer_map();
for key in &["fetch.min.bytes", "fetch.wait.max.ms", "fetch.max.bytes"] {
assert_ne!(
tp[*key], ll[*key],
"throughput and low_latency must differ on {key}"
);
}
}
#[test]
fn throughput_profile_producer_knobs() {
let s = sizing_for_profile(SelfRegulationProfile::Throughput);
let map = s.resolved_producer_map();
assert_eq!(map["batch.size"], "131072", "128 KiB batch");
assert_eq!(map["linger.ms"], "20");
assert_eq!(map["compression.type"], "lz4");
assert_eq!(map["queue.buffering.max.kbytes"], "65536");
assert_eq!(map["max.in.flight.requests.per.connection"], "5");
}
#[test]
fn low_latency_profile_producer_knobs() {
let s = sizing_for_profile(SelfRegulationProfile::LowLatency);
let map = s.resolved_producer_map();
assert_eq!(map["linger.ms"], "0", "send immediately");
assert_eq!(map["compression.type"], "lz4");
let batch: i32 = map["batch.size"].parse().unwrap();
assert!(batch < 131_072, "low_latency batch should be < throughput");
}
#[test]
fn throughput_vs_low_latency_producer_differ() {
let tp = sizing_for_profile(SelfRegulationProfile::Throughput).resolved_producer_map();
let ll = sizing_for_profile(SelfRegulationProfile::LowLatency).resolved_producer_map();
for key in &["batch.size", "linger.ms", "queue.buffering.max.kbytes"] {
assert_ne!(
tp[*key], ll[*key],
"throughput and low_latency must differ on {key}"
);
}
}
#[test]
fn explicit_consumer_knob_beats_profile() {
let s = KafkaSizingConfig {
profile: SelfRegulationProfile::Throughput,
consumer: ConsumerKnobs {
fetch_min_bytes: Some(2_097_152), ..Default::default()
},
..Default::default()
};
let map = s.resolved_consumer_map();
assert_eq!(
map["fetch.min.bytes"], "2097152",
"explicit override must win over profile default"
);
assert_eq!(map["fetch.wait.max.ms"], "50");
}
#[test]
fn explicit_producer_knob_beats_profile() {
let s = KafkaSizingConfig {
profile: SelfRegulationProfile::Throughput,
producer: ProducerKnobs {
linger_ms: Some(99), compression_type: Some("zstd".to_string()),
..Default::default()
},
..Default::default()
};
let map = s.resolved_producer_map();
assert_eq!(map["linger.ms"], "99");
assert_eq!(map["compression.type"], "zstd");
assert_eq!(map["sticky.partitioning.linger.ms"], "99");
assert_eq!(map["batch.size"], "131072");
}
#[test]
fn raw_consumer_librdkafka_wins_over_named_knob() {
let mut consumer_raw = BTreeMap::new();
consumer_raw.insert("fetch.min.bytes".to_string(), "9999".to_string());
let s = KafkaSizingConfig {
profile: SelfRegulationProfile::Throughput,
consumer: ConsumerKnobs {
fetch_min_bytes: Some(2_097_152), ..Default::default()
},
consumer_librdkafka: consumer_raw,
..Default::default()
};
let map = s.resolved_consumer_map();
assert_eq!(
map["fetch.min.bytes"], "9999",
"raw consumer_librdkafka must win over named knob"
);
}
#[test]
fn raw_producer_librdkafka_wins_over_named_knob() {
let mut producer_raw = BTreeMap::new();
producer_raw.insert("linger.ms".to_string(), "777".to_string());
producer_raw.insert("compression.type".to_string(), "gzip".to_string());
let s = KafkaSizingConfig {
profile: SelfRegulationProfile::Throughput,
producer: ProducerKnobs {
linger_ms: Some(20), compression_type: Some("lz4".to_string()), ..Default::default()
},
producer_librdkafka: producer_raw,
..Default::default()
};
let map = s.resolved_producer_map();
assert_eq!(
map["linger.ms"], "777",
"raw producer_librdkafka linger must win"
);
assert_eq!(
map["compression.type"], "gzip",
"raw producer_librdkafka compression must win"
);
}
#[test]
fn producer_map_sets_sticky_partitioning_linger() {
let s = sizing_for_profile(SelfRegulationProfile::Throughput);
let map = s.resolved_producer_map();
assert!(
map.contains_key("sticky.partitioning.linger.ms"),
"sticky.partitioning.linger.ms must be present"
);
assert_eq!(
map["sticky.partitioning.linger.ms"], map["linger.ms"],
"sticky linger must track linger.ms"
);
assert!(
!map.contains_key("partitioner"),
"producer map must NOT set partitioner to preserve caller's choice"
);
}
#[test]
fn compression_type_present_in_all_profiles() {
for profile in [
SelfRegulationProfile::Throughput,
SelfRegulationProfile::Balanced,
SelfRegulationProfile::LowLatency,
] {
let map = sizing_for_profile(profile).resolved_producer_map();
assert!(
map.contains_key("compression.type"),
"profile {profile:?} must set compression.type"
);
assert!(
!map["compression.type"].is_empty(),
"compression.type must not be empty"
);
}
}
#[test]
fn poll_cap_override_beats_profile() {
let s = KafkaSizingConfig {
profile: SelfRegulationProfile::Throughput,
consumer: ConsumerKnobs {
max_poll_records: Some(500), ..Default::default()
},
..Default::default()
};
assert_eq!(s.effective_poll_cap(), 500);
}
#[test]
fn poll_cap_absent_falls_back_to_profile() {
let s = sizing_for_profile(SelfRegulationProfile::Throughput);
assert_eq!(s.effective_poll_cap(), 2000);
}
#[test]
fn buffer_memory_converts_to_kib() {
let s = KafkaSizingConfig {
profile: SelfRegulationProfile::Throughput,
producer: ProducerKnobs {
buffer_memory_bytes: Some(1_048_576), ..Default::default()
},
..Default::default()
};
let map = s.resolved_producer_map();
assert_eq!(
map["queue.buffering.max.kbytes"], "1024",
"1 MiB = 1024 KiB"
);
}
#[test]
fn kafka_config_default_has_sizing_field() {
let cfg = KafkaConfig::default();
assert_eq!(cfg.sizing.profile, SelfRegulationProfile::Throughput);
assert!(cfg.sizing.consumer_librdkafka.is_empty());
assert!(cfg.sizing.producer_librdkafka.is_empty());
}
#[test]
fn sizing_profile_serialises_snake_case() {
let j = serde_json::to_string(&SelfRegulationProfile::LowLatency).unwrap();
assert_eq!(j, "\"low_latency\"");
let j = serde_json::to_string(&SelfRegulationProfile::Throughput).unwrap();
assert_eq!(j, "\"throughput\"");
let j = serde_json::to_string(&SelfRegulationProfile::Balanced).unwrap();
assert_eq!(j, "\"balanced\"");
}
}