use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SuppressionRule {
pub preferred_suffix: String,
pub suppressed_suffix: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum KafkaProfile {
#[default]
Production,
DevTest,
}
impl FromStr for KafkaProfile {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"production" | "prod" => Ok(Self::Production),
"devtest" | "dev" | "test" | "development" => Ok(Self::DevTest),
_ => Err(format!(
"Unknown Kafka profile: {s}. Valid: production, devtest"
)),
}
}
}
impl std::fmt::Display for KafkaProfile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Production => write!(f, "production"),
Self::DevTest => write!(f, "devtest"),
}
}
}
#[must_use]
pub fn merge_with_overrides<S: std::hash::BuildHasher>(
profile: &[(&str, &str)],
overrides: &HashMap<String, String, S>,
) -> HashMap<String, String> {
let mut config = HashMap::with_capacity(profile.len() + overrides.len());
for (key, value) in profile {
config.insert((*key).to_string(), (*value).to_string());
}
for (key, value) in overrides {
config.insert(key.clone(), value.clone());
}
config
}
pub const PRODUCTION_PROFILE: &[(&str, &str)] = &[
("partition.assignment.strategy", "cooperative-sticky"),
("fetch.min.bytes", "1048576"),
("fetch.wait.max.ms", "100"),
("queued.min.messages", "20000"),
("enable.auto.commit", "false"),
("statistics.interval.ms", "1000"),
];
pub const DEVTEST_PROFILE: &[(&str, &str)] = &[
("partition.assignment.strategy", "cooperative-sticky"),
("queued.min.messages", "1000"),
("enable.auto.commit", "false"),
("reconnect.backoff.ms", "10"),
("reconnect.backoff.max.ms", "100"),
("log.connection.close", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_HIGH_THROUGHPUT: &[(&str, &str)] = &[
("linger.ms", "100"),
("compression.type", "zstd"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_EXACTLY_ONCE: &[(&str, &str)] = &[
("enable.idempotence", "true"),
("acks", "all"),
("max.in.flight.requests.per.connection", "5"),
("linger.ms", "20"),
("compression.type", "zstd"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_LOW_LATENCY: &[(&str, &str)] = &[
("acks", "1"),
("linger.ms", "0"),
("compression.type", "lz4"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
pub const PRODUCER_DEVTEST: &[(&str, &str)] = &[
("acks", "1"),
("socket.nagle.disable", "true"),
("statistics.interval.ms", "1000"),
];
#[deprecated(since = "2.0.0", note = "Use PRODUCER_HIGH_THROUGHPUT instead")]
pub const PRODUCER_DEFAULTS: &[(&str, &str)] = PRODUCER_HIGH_THROUGHPUT;
#[deprecated(since = "2.0.0", note = "Use PRODUCTION_PROFILE instead")]
pub const HIGH_THROUGHPUT_CONSUMER_DEFAULTS: &[(&str, &str)] = PRODUCTION_PROFILE;
pub const LOW_LATENCY_CONSUMER_DEFAULTS: &[(&str, &str)] = &[
("partition.assignment.strategy", "cooperative-sticky"),
("fetch.wait.max.ms", "10"),
("queued.min.messages", "1000"),
("enable.auto.commit", "false"),
("reconnect.backoff.ms", "10"),
("reconnect.backoff.max.ms", "100"),
("statistics.interval.ms", "1000"),
];
#[derive(Debug, Clone, Serialize, Deserialize)]
#[allow(clippy::struct_excessive_bools)] pub struct KafkaConfig {
#[serde(default)]
pub profile: KafkaProfile,
#[serde(default = "default_brokers")]
pub brokers: Vec<String>,
#[serde(default = "default_group")]
pub group: String,
#[serde(default = "default_client_id")]
pub client_id: String,
#[serde(default)]
pub topics: Vec<String>,
#[serde(default)]
pub auto_discover: bool,
#[serde(default)]
pub topic_include: Vec<String>,
#[serde(default = "default_topic_exclude")]
pub topic_exclude: Vec<String>,
#[serde(default = "default_topic_refresh_secs")]
pub topic_refresh_secs: u64,
#[serde(default = "default_topic_suppression_rules")]
pub topic_suppression_rules: Vec<SuppressionRule>,
#[serde(default = "default_security_protocol")]
pub security_protocol: String,
#[serde(default)]
pub sasl_mechanism: Option<String>,
#[serde(default)]
pub sasl_username: Option<String>,
#[serde(default)]
pub sasl_password: Option<crate::SensitiveString>,
#[serde(default)]
pub ssl_ca_location: Option<String>,
#[serde(default)]
pub ssl_certificate_location: Option<String>,
#[serde(default)]
pub ssl_key_location: Option<String>,
#[serde(default)]
pub ssl_skip_verify: bool,
#[serde(default)]
pub enable_auto_commit: bool,
#[serde(default = "default_auto_commit_interval")]
pub auto_commit_interval_ms: u32,
#[serde(default = "default_session_timeout")]
pub session_timeout_ms: u32,
#[serde(default = "default_heartbeat_interval")]
pub heartbeat_interval_ms: u32,
#[serde(default = "default_max_poll_interval")]
pub max_poll_interval_ms: u32,
#[serde(default = "default_fetch_min_bytes")]
pub fetch_min_bytes: i32,
#[serde(default = "default_fetch_max_bytes")]
pub fetch_max_bytes: i32,
#[serde(default = "default_max_partition_fetch_bytes")]
pub max_partition_fetch_bytes: i32,
#[serde(default = "default_auto_offset_reset")]
pub auto_offset_reset: String,
#[serde(default)]
pub enable_partition_eof: bool,
#[serde(default)]
pub librdkafka_overrides: HashMap<String, String>,
#[serde(default)]
#[deprecated(since = "1.3.0", note = "Use `librdkafka_overrides` instead")]
pub extra_config: HashMap<String, String>,
#[serde(default)]
pub filters_in: Vec<crate::transport::filter::FilterRule>,
#[serde(default)]
pub filters_out: Vec<crate::transport::filter::FilterRule>,
}
fn default_topic_exclude() -> Vec<String> {
vec!["^__".to_string()]
}
fn default_topic_refresh_secs() -> u64 {
60
}
fn default_topic_suppression_rules() -> Vec<SuppressionRule> {
vec![SuppressionRule {
preferred_suffix: "_load".into(),
suppressed_suffix: "_land".into(),
}]
}
fn default_brokers() -> Vec<String> {
vec!["localhost:9092".to_string()]
}
fn default_group() -> String {
"hyperi-rustlib-consumer".to_string()
}
fn default_client_id() -> String {
"hyperi-rustlib".to_string()
}
fn default_security_protocol() -> String {
"plaintext".to_string()
}
fn default_auto_commit_interval() -> u32 {
5000
}
fn default_session_timeout() -> u32 {
45000
}
fn default_heartbeat_interval() -> u32 {
3000
}
fn default_max_poll_interval() -> u32 {
300_000
}
fn default_fetch_min_bytes() -> i32 {
1
}
fn default_fetch_max_bytes() -> i32 {
52_428_800 }
fn default_max_partition_fetch_bytes() -> i32 {
1_048_576 }
fn default_auto_offset_reset() -> String {
"earliest".to_string()
}
impl Default for KafkaConfig {
fn default() -> Self {
#[allow(deprecated)]
Self {
profile: KafkaProfile::default(),
brokers: default_brokers(),
group: default_group(),
client_id: default_client_id(),
topics: Vec::new(),
auto_discover: false,
topic_include: Vec::new(),
topic_exclude: default_topic_exclude(),
topic_refresh_secs: default_topic_refresh_secs(),
topic_suppression_rules: default_topic_suppression_rules(),
security_protocol: default_security_protocol(),
sasl_mechanism: None,
sasl_username: None,
sasl_password: None,
ssl_ca_location: None,
ssl_certificate_location: None,
ssl_key_location: None,
ssl_skip_verify: false,
enable_auto_commit: false,
auto_commit_interval_ms: default_auto_commit_interval(),
session_timeout_ms: default_session_timeout(),
heartbeat_interval_ms: default_heartbeat_interval(),
max_poll_interval_ms: default_max_poll_interval(),
fetch_min_bytes: default_fetch_min_bytes(),
fetch_max_bytes: default_fetch_max_bytes(),
max_partition_fetch_bytes: default_max_partition_fetch_bytes(),
auto_offset_reset: default_auto_offset_reset(),
enable_partition_eof: false,
librdkafka_overrides: HashMap::new(),
extra_config: HashMap::new(),
filters_in: Vec::new(),
filters_out: Vec::new(),
}
}
}
impl KafkaConfig {
#[must_use]
pub fn production() -> Self {
Self {
profile: KafkaProfile::Production,
..Default::default()
}
}
#[must_use]
pub fn devtest() -> Self {
Self {
profile: KafkaProfile::DevTest,
ssl_skip_verify: true,
..Default::default()
}
}
#[must_use]
pub fn for_testing(brokers: &str, group: &str, topics: Vec<String>) -> Self {
Self {
profile: KafkaProfile::DevTest,
brokers: vec![brokers.to_string()],
group: group.to_string(),
topics,
ssl_skip_verify: true,
..Default::default()
}
}
#[must_use]
pub fn with_profile(mut self, profile: KafkaProfile) -> Self {
self.profile = profile;
if profile == KafkaProfile::DevTest {
self.ssl_skip_verify = true;
}
self
}
#[must_use]
pub fn profile_defaults(&self) -> &'static [(&'static str, &'static str)] {
match self.profile {
KafkaProfile::Production => PRODUCTION_PROFILE,
KafkaProfile::DevTest => DEVTEST_PROFILE,
}
}
#[must_use]
#[allow(deprecated)]
pub fn build_librdkafka_config(&self) -> HashMap<String, String> {
let mut config = HashMap::new();
for (key, value) in self.profile_defaults() {
config.insert((*key).to_string(), (*value).to_string());
}
for (key, value) in &self.extra_config {
config.insert(key.clone(), value.clone());
}
for (key, value) in &self.librdkafka_overrides {
config.insert(key.clone(), value.clone());
}
config
}
#[must_use]
pub fn with_override(mut self, key: &str, value: &str) -> Self {
self.librdkafka_overrides
.insert(key.to_string(), value.to_string());
self
}
#[must_use]
pub fn with_overrides(mut self, overrides: &[(&str, &str)]) -> Self {
for (key, value) in overrides {
self.librdkafka_overrides
.insert((*key).to_string(), (*value).to_string());
}
self
}
#[must_use]
pub fn with_scram(mut self, mechanism: &str, username: &str, password: &str) -> Self {
self.security_protocol = "sasl_plaintext".to_string();
self.sasl_mechanism = Some(mechanism.to_string());
self.sasl_username = Some(username.to_string());
self.sasl_password = Some(crate::SensitiveString::new(password));
self
}
#[must_use]
pub fn with_scram_ssl(mut self, mechanism: &str, username: &str, password: &str) -> Self {
self.security_protocol = "sasl_ssl".to_string();
self.sasl_mechanism = Some(mechanism.to_string());
self.sasl_username = Some(username.to_string());
self.sasl_password = Some(crate::SensitiveString::new(password));
self
}
#[must_use]
pub fn with_tls(mut self, ca_location: Option<&str>) -> Self {
if self.security_protocol == "plaintext" {
self.security_protocol = "ssl".to_string();
} else if self.security_protocol == "sasl_plaintext" {
self.security_protocol = "sasl_ssl".to_string();
}
self.ssl_ca_location = ca_location.map(String::from);
self
}
#[must_use]
pub fn with_client_cert(mut self, cert_location: &str, key_location: &str) -> Self {
self.ssl_certificate_location = Some(cert_location.to_string());
self.ssl_key_location = Some(key_location.to_string());
self
}
#[must_use]
pub fn with_ssl_skip_verify(mut self) -> Self {
self.ssl_skip_verify = true;
self
}
#[must_use]
pub fn with_ssl_insecure(mut self) -> Self {
if self.security_protocol == "plaintext" {
self.security_protocol = "ssl".to_string();
} else if self.security_protocol == "sasl_plaintext" {
self.security_protocol = "sasl_ssl".to_string();
}
self.ssl_skip_verify = true;
self
}
#[must_use]
#[deprecated(since = "2.0.0", note = "Use producer profile constants directly")]
#[allow(deprecated)]
pub fn with_producer_defaults(mut self) -> Self {
for (key, value) in PRODUCER_HIGH_THROUGHPUT {
self.extra_config
.entry((*key).to_string())
.or_insert_with(|| (*value).to_string());
}
self
}
#[must_use]
#[deprecated(since = "2.0.0", note = "Use KafkaConfig::production() instead")]
#[allow(deprecated)]
pub fn with_high_throughput(mut self) -> Self {
for (key, value) in PRODUCTION_PROFILE {
self.extra_config
.entry((*key).to_string())
.or_insert_with(|| (*value).to_string());
}
self
}
#[must_use]
#[deprecated(since = "2.0.0", note = "Use LOW_LATENCY_CONSUMER_DEFAULTS directly")]
#[allow(deprecated)]
pub fn with_low_latency(mut self) -> Self {
for (key, value) in LOW_LATENCY_CONSUMER_DEFAULTS {
self.extra_config
.entry((*key).to_string())
.or_insert_with(|| (*value).to_string());
}
self
}
#[must_use]
pub fn with_statistics(mut self, interval_ms: u32) -> Self {
self.librdkafka_overrides.insert(
"statistics.interval.ms".to_string(),
interval_ms.to_string(),
);
self
}
#[must_use]
pub fn with_cloud_connection_tuning(mut self) -> Self {
let cloud_settings = [
("socket.keepalive.enable", "true"),
("metadata.max.age.ms", "180000"),
("socket.connection.setup.timeout.ms", "30000"),
("connections.max.idle.ms", "540000"),
];
for (key, value) in cloud_settings {
self.librdkafka_overrides
.entry(key.to_string())
.or_insert_with(|| value.to_string());
}
self
}
#[cfg(feature = "config")]
#[must_use]
pub fn from_env(prefix: &str) -> Self {
use crate::config::env_compat::EnvVar;
let mut config = Self::default();
let prefixed = |name: &str, legacy: &[&str]| {
let mut var = EnvVar::new(&format!("{prefix}_{name}"));
for l in legacy {
var = var.with_legacy(&format!("{prefix}_{l}"));
}
var = var.with_legacy(&format!("KAFKA_{name}"));
var
};
if let Some(val) = prefixed("PROFILE", &[]).get()
&& let Ok(profile) = val.parse()
{
config.profile = profile;
if config.profile == KafkaProfile::DevTest {
config.ssl_skip_verify = true;
}
}
if let Some(brokers) = prefixed("BOOTSTRAP_SERVERS", &["BROKERS"]).get_list() {
config.brokers = brokers;
}
if let Some(val) = prefixed("GROUP_ID", &["GROUP", "CONSUMER_GROUP"]).get() {
config.group = val;
}
if let Some(val) = prefixed("CLIENT_ID", &[]).get() {
config.client_id = val;
}
if let Some(val) = prefixed("SECURITY_PROTOCOL", &[]).get() {
config.security_protocol = val;
}
if let Some(val) = prefixed("SASL_MECHANISM", &[]).get() {
config.sasl_mechanism = Some(val);
}
if let Some(val) = prefixed("SASL_USERNAME", &["SASL_USER"]).get() {
config.sasl_username = Some(val);
}
if let Some(val) = prefixed("SASL_PASSWORD", &[]).get() {
config.sasl_password = Some(crate::SensitiveString::from(val));
}
if let Some(val) = prefixed("SSL_CA_LOCATION", &["CA_CERT", "SSL_CA"]).get() {
config.ssl_ca_location = Some(val);
}
if let Some(val) = prefixed("SSL_SKIP_VERIFY", &["SSL_INSECURE", "INSECURE"]).get_bool() {
config.ssl_skip_verify = val;
}
if let Some(topics) = prefixed("TOPICS", &["TOPIC"]).get_list() {
config.topics = topics;
}
config
}
#[cfg(feature = "config")]
#[must_use]
pub fn from_env_standard() -> Self {
Self::from_env("KAFKA")
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn kafka_config_topic_resolution_defaults() {
let config = KafkaConfig::default();
assert!(config.topic_include.is_empty());
assert_eq!(config.topic_exclude, vec!["^__".to_string()]);
assert!(!config.auto_discover);
assert_eq!(config.topic_refresh_secs, 60);
assert_eq!(config.topic_suppression_rules.len(), 1);
assert_eq!(config.topic_suppression_rules[0].preferred_suffix, "_load");
assert_eq!(config.topic_suppression_rules[0].suppressed_suffix, "_land");
}
}