use std::time::Duration;
#[derive(Clone, Debug, Default)]
pub struct KafkaConfig {
pub brokers: String,
pub username: String,
pub password: String,
pub sasl_type_scram_sha: String,
pub sasl_mechanism: String,
pub security_protocol: String,
pub cert_path: String,
pub insecure_skip_verify: bool,
pub message_timeout: Duration,
pub message_max_bytes: usize,
pub message_send_max_retries: usize,
pub auto_offset_reset: String,
pub enable_auto_commit: bool,
pub graceful_wait_timeout: Duration,
}
impl KafkaConfig {
pub fn builder(brokers: &str) -> KafkaConfigBuilder {
KafkaConfigBuilder::new(brokers)
}
}
#[derive(Debug, Default)]
pub struct KafkaConfigBuilder {
brokers: String,
username: String,
password: String,
sasl_type_scram_sha: Option<String>,
sasl_mechanism: Option<String>,
security_protocol: Option<String>,
cert_path: String,
insecure_skip_verify: Option<bool>,
message_timeout: Option<Duration>,
message_max_bytes: Option<usize>,
message_send_max_retries: Option<usize>,
auto_offset_reset: Option<String>,
enable_auto_commit: Option<bool>,
graceful_wait_timeout: Option<Duration>,
}
impl KafkaConfigBuilder {
pub fn new(brokers: &str) -> Self {
Self {
brokers: brokers.to_string(),
..Default::default()
}
}
pub fn with_username(mut self, username: &str) -> Self {
self.username = username.to_string();
self
}
pub fn with_password(mut self, password: &str) -> Self {
self.password = password.to_string();
self
}
pub fn with_sasl_type_scram_sha(mut self, sasl_type_scram_sha: &str) -> Self {
self.sasl_type_scram_sha = Some(sasl_type_scram_sha.to_string());
self
}
pub fn with_sasl_mechanism(mut self, mechanism: &str) -> Self {
self.sasl_mechanism = Some(mechanism.to_string());
self
}
pub fn with_security_protocol(mut self, protocol: &str) -> Self {
self.security_protocol = Some(protocol.to_string());
self
}
pub fn with_cert_path(mut self, cert_path: &str) -> Self {
self.cert_path = cert_path.to_string();
self
}
pub fn with_message_timeout(mut self, timeout: Duration) -> Self {
self.message_timeout = Some(timeout);
self
}
pub fn with_message_max_bytes(mut self, message_max_bytes: usize) -> Self {
self.message_max_bytes = Some(message_max_bytes);
self
}
pub fn with_auto_offset_reset(mut self, auto_offset_reset: &str) -> Self {
self.auto_offset_reset = Some(auto_offset_reset.to_string());
self
}
pub fn with_graceful_wait_timeout(mut self, timeout: Duration) -> Self {
self.graceful_wait_timeout = Some(timeout);
self
}
pub fn with_message_send_max_retries(mut self, max_retries: usize) -> Self {
self.message_send_max_retries = Some(max_retries);
self
}
pub fn with_enable_auto_commit(mut self, enable_auto_commit: bool) -> Self {
self.enable_auto_commit = Some(enable_auto_commit);
self
}
pub fn build(self) -> KafkaConfig {
KafkaConfig {
brokers: self.brokers,
username: self.username,
password: self.password,
sasl_type_scram_sha: self
.sasl_type_scram_sha
.unwrap_or("SCRAM-SHA-256".to_string()),
sasl_mechanism: self.sasl_mechanism.unwrap_or("PLAIN".to_string()),
security_protocol: self.security_protocol.unwrap_or("PLAINTEXT".to_string()),
cert_path: self.cert_path,
insecure_skip_verify: self.insecure_skip_verify.unwrap_or(false),
message_timeout: self.message_timeout.unwrap_or(Duration::from_secs(10)),
message_max_bytes: self.message_max_bytes.unwrap_or(1024 * 1024),
message_send_max_retries: self.message_send_max_retries.unwrap_or(3),
enable_auto_commit: self.enable_auto_commit.unwrap_or(false),
auto_offset_reset: self.auto_offset_reset.unwrap_or("latest".to_string()),
graceful_wait_timeout: self.graceful_wait_timeout.unwrap_or(Duration::from_secs(3)),
}
}
}