use std::path::PathBuf;
use std::time::Duration;
use kafka_protocol::records::Compression;
use crate::constants::{READ_COMMITTED, READ_UNCOMMITTED};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SecurityProtocol {
Plaintext,
Ssl,
SaslPlaintext,
SaslSsl,
}
impl SecurityProtocol {
pub fn uses_tls(self) -> bool {
matches!(self, Self::Ssl | Self::SaslSsl)
}
pub fn uses_sasl(self) -> bool {
matches!(self, Self::SaslPlaintext | Self::SaslSsl)
}
fn with_sasl(self) -> Self {
if self.uses_tls() {
Self::SaslSsl
} else {
Self::SaslPlaintext
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum SaslMechanism {
#[default]
Plain,
ScramSha256,
ScramSha512,
}
impl SaslMechanism {
pub fn as_str(self) -> &'static str {
match self {
Self::Plain => "PLAIN",
Self::ScramSha256 => "SCRAM-SHA-256",
Self::ScramSha512 => "SCRAM-SHA-512",
}
}
pub fn is_scram(self) -> bool {
matches!(self, Self::ScramSha256 | Self::ScramSha512)
}
pub fn scram_type(self) -> Option<i8> {
match self {
Self::Plain => None,
Self::ScramSha256 => Some(1),
Self::ScramSha512 => Some(2),
}
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct SaslConfig {
pub mechanism: SaslMechanism,
pub username: Option<String>,
pub password: Option<String>,
pub authorization_id: Option<String>,
}
impl SaslConfig {
pub fn plain(username: impl Into<String>, password: impl Into<String>) -> Self {
Self {
mechanism: SaslMechanism::Plain,
username: Some(username.into()),
password: Some(password.into()),
authorization_id: None,
}
}
pub fn scram_sha_256(username: impl Into<String>, password: impl Into<String>) -> Self {
Self {
mechanism: SaslMechanism::ScramSha256,
username: Some(username.into()),
password: Some(password.into()),
authorization_id: None,
}
}
pub fn scram_sha_512(username: impl Into<String>, password: impl Into<String>) -> Self {
Self {
mechanism: SaslMechanism::ScramSha512,
username: Some(username.into()),
password: Some(password.into()),
authorization_id: None,
}
}
pub fn with_authorization_id(mut self, authorization_id: impl Into<String>) -> Self {
self.authorization_id = Some(authorization_id.into());
self
}
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct TlsConfig {
pub ca_cert_path: Option<PathBuf>,
pub client_cert_path: Option<PathBuf>,
pub client_key_path: Option<PathBuf>,
pub server_name: Option<String>,
}
impl TlsConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
self.ca_cert_path = Some(path.into());
self
}
pub fn with_client_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
self.client_cert_path = Some(path.into());
self
}
pub fn with_client_key_path(mut self, path: impl Into<PathBuf>) -> Self {
self.client_key_path = Some(path.into());
self
}
pub fn with_server_name(mut self, server_name: impl Into<String>) -> Self {
self.server_name = Some(server_name.into());
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProducerPartitioner {
Default,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProducerCompression {
None,
Gzip,
Snappy,
Lz4,
Zstd,
}
impl From<ProducerCompression> for Compression {
fn from(value: ProducerCompression) -> Self {
match value {
ProducerCompression::None => Compression::None,
ProducerCompression::Gzip => Compression::Gzip,
ProducerCompression::Snappy => Compression::Snappy,
ProducerCompression::Lz4 => Compression::Lz4,
ProducerCompression::Zstd => Compression::Zstd,
}
}
}
#[derive(Debug, Clone)]
pub struct ProducerConfig {
pub bootstrap_servers: Vec<String>,
pub client_id: String,
pub security_protocol: SecurityProtocol,
pub tls: TlsConfig,
pub sasl: SaslConfig,
pub acks: i16,
pub enable_idempotence: bool,
pub partitioner: ProducerPartitioner,
pub compression: ProducerCompression,
pub batch_size: usize,
pub linger: Duration,
pub delivery_timeout: Duration,
pub request_timeout: Duration,
pub metadata_max_age: Duration,
pub retry_backoff: Duration,
pub max_retries: usize,
pub transactional_id: Option<String>,
pub transaction_timeout: Duration,
}
impl ProducerConfig {
pub fn new(bootstrap_server: impl Into<String>) -> Self {
Self {
bootstrap_servers: vec![bootstrap_server.into()],
client_id: "rust-producer".to_owned(),
security_protocol: SecurityProtocol::Plaintext,
tls: TlsConfig::default(),
sasl: SaslConfig::default(),
acks: 1,
enable_idempotence: false,
partitioner: ProducerPartitioner::Default,
compression: ProducerCompression::None,
batch_size: 16 * 1024,
linger: Duration::ZERO,
delivery_timeout: Duration::from_secs(120),
request_timeout: Duration::from_secs(5),
metadata_max_age: Duration::from_secs(30),
retry_backoff: Duration::from_millis(250),
max_retries: 3,
transactional_id: None,
transaction_timeout: Duration::from_secs(30),
}
}
pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
self.client_id = client_id.into();
self
}
pub fn with_bootstrap_servers(
mut self,
servers: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
self
}
pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
self.security_protocol = security_protocol;
self
}
pub fn with_tls(mut self, tls: TlsConfig) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = tls;
self
}
pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = self.tls.with_ca_cert_path(path);
self
}
pub fn with_tls_client_auth_paths(
mut self,
cert_path: impl Into<PathBuf>,
key_path: impl Into<PathBuf>,
) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = self
.tls
.with_client_cert_path(cert_path)
.with_client_key_path(key_path);
self
}
pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = self.tls.with_server_name(server_name);
self
}
pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
self.security_protocol = self.security_protocol.with_sasl();
self.sasl = sasl;
self
}
pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
self.with_sasl(SaslConfig::plain(username, password))
}
pub fn with_sasl_scram_sha_256(
self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.with_sasl(SaslConfig::scram_sha_256(username, password))
}
pub fn with_sasl_scram_sha_512(
self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.with_sasl(SaslConfig::scram_sha_512(username, password))
}
pub fn with_acks(mut self, acks: i16) -> Self {
self.acks = acks;
self
}
pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
self.enable_idempotence = enable_idempotence;
if enable_idempotence {
self.acks = -1;
self.max_retries = self.max_retries.max(1);
}
self
}
pub fn with_partitioner(mut self, partitioner: ProducerPartitioner) -> Self {
self.partitioner = partitioner;
self
}
pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
self.compression = compression;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size.max(1);
self
}
pub fn with_linger(mut self, linger: Duration) -> Self {
self.linger = linger;
self
}
pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
self.delivery_timeout = delivery_timeout;
self
}
pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
self.request_timeout = request_timeout;
self
}
pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
self.metadata_max_age = metadata_max_age;
self
}
pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
self.retry_backoff = retry_backoff;
self
}
pub fn with_max_retries(mut self, max_retries: usize) -> Self {
self.max_retries = max_retries;
self
}
pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
self.transactional_id = Some(transactional_id.into());
self.acks = -1;
self.enable_idempotence = true;
self
}
pub fn with_transaction_timeout(mut self, transaction_timeout: Duration) -> Self {
self.transaction_timeout = transaction_timeout;
self
}
pub fn is_transactional(&self) -> bool {
self.transactional_id.is_some()
}
pub fn is_idempotent(&self) -> bool {
self.enable_idempotence
}
}
pub type ClientConfig = ProducerConfig;
#[derive(Debug, Clone)]
pub struct AdminConfig {
pub bootstrap_servers: Vec<String>,
pub client_id: String,
pub security_protocol: SecurityProtocol,
pub tls: TlsConfig,
pub sasl: SaslConfig,
pub request_timeout: Duration,
}
impl AdminConfig {
pub fn new(bootstrap_server: impl Into<String>) -> Self {
Self {
bootstrap_servers: vec![bootstrap_server.into()],
client_id: "rust-admin".to_owned(),
security_protocol: SecurityProtocol::Plaintext,
tls: TlsConfig::default(),
sasl: SaslConfig::default(),
request_timeout: Duration::from_secs(5),
}
}
pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
self.client_id = client_id.into();
self
}
pub fn with_bootstrap_servers(
mut self,
servers: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
self
}
pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
self.security_protocol = security_protocol;
self
}
pub fn with_tls(mut self, tls: TlsConfig) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = tls;
self
}
pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = self.tls.with_ca_cert_path(path);
self
}
pub fn with_tls_client_auth_paths(
mut self,
cert_path: impl Into<PathBuf>,
key_path: impl Into<PathBuf>,
) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = self
.tls
.with_client_cert_path(cert_path)
.with_client_key_path(key_path);
self
}
pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = self.tls.with_server_name(server_name);
self
}
pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
self.security_protocol = self.security_protocol.with_sasl();
self.sasl = sasl;
self
}
pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
self.with_sasl(SaslConfig::plain(username, password))
}
pub fn with_sasl_scram_sha_256(
self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.with_sasl(SaslConfig::scram_sha_256(username, password))
}
pub fn with_sasl_scram_sha_512(
self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.with_sasl(SaslConfig::scram_sha_512(username, password))
}
pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
self.request_timeout = request_timeout;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AutoOffsetReset {
Earliest,
Latest,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum IsolationLevel {
ReadUncommitted,
ReadCommitted,
}
impl IsolationLevel {
pub fn as_protocol_value(self) -> i8 {
match self {
Self::ReadUncommitted => READ_UNCOMMITTED,
Self::ReadCommitted => READ_COMMITTED,
}
}
}
#[derive(Debug, Clone)]
pub struct ConsumerConfig {
pub bootstrap_servers: Vec<String>,
pub client_id: String,
pub group_id: String,
pub security_protocol: SecurityProtocol,
pub tls: TlsConfig,
pub sasl: SaslConfig,
pub request_timeout: Duration,
pub metadata_max_age: Duration,
pub retry_backoff: Duration,
pub max_retries: usize,
pub rebalance_timeout: Duration,
pub fetch_max_wait: Duration,
pub fetch_min_bytes: i32,
pub fetch_max_bytes: i32,
pub partition_max_bytes: i32,
pub auto_offset_reset: AutoOffsetReset,
pub isolation_level: IsolationLevel,
pub enable_auto_commit: bool,
pub auto_commit_interval: Duration,
pub server_assignor: Option<String>,
pub rack_id: Option<String>,
pub instance_id: Option<String>,
}
impl ConsumerConfig {
pub fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
Self {
bootstrap_servers: vec![bootstrap_server.into()],
client_id: "rust-consumer".to_owned(),
group_id: group_id.into(),
security_protocol: SecurityProtocol::Plaintext,
tls: TlsConfig::default(),
sasl: SaslConfig::default(),
request_timeout: Duration::from_secs(5),
metadata_max_age: Duration::from_secs(30),
retry_backoff: Duration::from_millis(250),
max_retries: 3,
rebalance_timeout: Duration::from_secs(30),
fetch_max_wait: Duration::from_millis(500),
fetch_min_bytes: 1,
fetch_max_bytes: 50 * 1024 * 1024,
partition_max_bytes: 1024 * 1024,
auto_offset_reset: AutoOffsetReset::Earliest,
isolation_level: IsolationLevel::ReadUncommitted,
enable_auto_commit: false,
auto_commit_interval: Duration::from_secs(5),
server_assignor: None,
rack_id: None,
instance_id: None,
}
}
pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
self.client_id = client_id.into();
self
}
pub fn with_bootstrap_servers(
mut self,
servers: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
self
}
pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
self.security_protocol = security_protocol;
self
}
pub fn with_tls(mut self, tls: TlsConfig) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = tls;
self
}
pub fn with_tls_ca_cert_path(mut self, path: impl Into<PathBuf>) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = self.tls.with_ca_cert_path(path);
self
}
pub fn with_tls_client_auth_paths(
mut self,
cert_path: impl Into<PathBuf>,
key_path: impl Into<PathBuf>,
) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = self
.tls
.with_client_cert_path(cert_path)
.with_client_key_path(key_path);
self
}
pub fn with_tls_server_name(mut self, server_name: impl Into<String>) -> Self {
self.security_protocol = if self.security_protocol.uses_sasl() {
SecurityProtocol::SaslSsl
} else {
SecurityProtocol::Ssl
};
self.tls = self.tls.with_server_name(server_name);
self
}
pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
self.security_protocol = self.security_protocol.with_sasl();
self.sasl = sasl;
self
}
pub fn with_sasl_plain(self, username: impl Into<String>, password: impl Into<String>) -> Self {
self.with_sasl(SaslConfig::plain(username, password))
}
pub fn with_sasl_scram_sha_256(
self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.with_sasl(SaslConfig::scram_sha_256(username, password))
}
pub fn with_sasl_scram_sha_512(
self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.with_sasl(SaslConfig::scram_sha_512(username, password))
}
pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
self.request_timeout = request_timeout;
self
}
pub fn with_metadata_max_age(mut self, metadata_max_age: Duration) -> Self {
self.metadata_max_age = metadata_max_age;
self
}
pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
self.retry_backoff = retry_backoff;
self
}
pub fn with_max_retries(mut self, max_retries: usize) -> Self {
self.max_retries = max_retries;
self
}
pub fn with_rebalance_timeout(mut self, rebalance_timeout: Duration) -> Self {
self.rebalance_timeout = rebalance_timeout;
self
}
pub fn with_fetch_max_wait(mut self, fetch_max_wait: Duration) -> Self {
self.fetch_max_wait = fetch_max_wait;
self
}
pub fn with_fetch_min_bytes(mut self, fetch_min_bytes: i32) -> Self {
self.fetch_min_bytes = fetch_min_bytes;
self
}
pub fn with_fetch_max_bytes(mut self, fetch_max_bytes: i32) -> Self {
self.fetch_max_bytes = fetch_max_bytes;
self
}
pub fn with_partition_max_bytes(mut self, partition_max_bytes: i32) -> Self {
self.partition_max_bytes = partition_max_bytes;
self
}
pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
self.auto_offset_reset = auto_offset_reset;
self
}
pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
self.isolation_level = isolation_level;
self
}
pub fn with_enable_auto_commit(mut self, enable_auto_commit: bool) -> Self {
self.enable_auto_commit = enable_auto_commit;
self
}
pub fn with_auto_commit_interval(mut self, auto_commit_interval: Duration) -> Self {
self.auto_commit_interval = auto_commit_interval;
self
}
pub fn with_server_assignor(mut self, server_assignor: impl Into<String>) -> Self {
self.server_assignor = Some(server_assignor.into());
self
}
pub fn with_rack_id(mut self, rack_id: impl Into<String>) -> Self {
self.rack_id = Some(rack_id.into());
self
}
pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
self.instance_id = Some(instance_id.into());
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn producer_tls_builder_enables_ssl() {
let config =
ProducerConfig::new("localhost:9093").with_tls_ca_cert_path("/tmp/cluster-ca.pem");
assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
assert_eq!(
config.tls.ca_cert_path.as_deref(),
Some(std::path::Path::new("/tmp/cluster-ca.pem"))
);
}
#[test]
fn producer_sasl_plain_builder_enables_sasl_plaintext() {
let config = ProducerConfig::new("localhost:9092").with_sasl_plain("user-a", "secret-a");
assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
assert_eq!(config.sasl.mechanism, SaslMechanism::Plain);
assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
}
#[test]
fn producer_sasl_scram_builder_enables_sasl_plaintext() {
let config =
ProducerConfig::new("localhost:9092").with_sasl_scram_sha_256("user-a", "secret-a");
assert_eq!(config.security_protocol, SecurityProtocol::SaslPlaintext);
assert_eq!(config.sasl.mechanism, SaslMechanism::ScramSha256);
assert_eq!(config.sasl.username.as_deref(), Some("user-a"));
assert_eq!(config.sasl.password.as_deref(), Some("secret-a"));
}
#[test]
fn producer_tls_and_sasl_builders_enable_sasl_ssl() {
let config = ProducerConfig::new("localhost:9093")
.with_sasl_plain("user-a", "secret-a")
.with_tls_ca_cert_path("/tmp/cluster-ca.pem");
assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
let config = ProducerConfig::new("localhost:9093")
.with_tls_ca_cert_path("/tmp/cluster-ca.pem")
.with_sasl_plain("user-a", "secret-a");
assert_eq!(config.security_protocol, SecurityProtocol::SaslSsl);
}
#[test]
fn producer_multi_broker_with_bootstrap_servers() {
let config = ProducerConfig::new("host1:9092").with_bootstrap_servers([
"host1:9092",
"host2:9092",
"host3:9092",
]);
assert_eq!(
config.bootstrap_servers,
vec!["host1:9092", "host2:9092", "host3:9092"]
);
}
#[test]
fn producer_builder_records_tuning_and_transaction_options() {
let config = ProducerConfig::new("localhost:9092")
.with_client_id("producer-a")
.with_security_protocol(SecurityProtocol::Ssl)
.with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
.with_tls_server_name("kafka.internal")
.with_acks(-1)
.with_enable_idempotence(true)
.with_partitioner(ProducerPartitioner::Default)
.with_compression(ProducerCompression::Zstd)
.with_batch_size(0)
.with_linger(Duration::from_millis(50))
.with_delivery_timeout(Duration::from_secs(10))
.with_request_timeout(Duration::from_secs(2))
.with_metadata_max_age(Duration::from_secs(60))
.with_retry_backoff(Duration::from_millis(75))
.with_max_retries(7)
.with_transactional_id("tx-a")
.with_transaction_timeout(Duration::from_secs(45));
assert_eq!(config.client_id, "producer-a");
assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
assert_eq!(
config.tls.client_cert_path.as_deref(),
Some(std::path::Path::new("/tmp/client.crt"))
);
assert_eq!(
config.tls.client_key_path.as_deref(),
Some(std::path::Path::new("/tmp/client.key"))
);
assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
assert_eq!(config.acks, -1);
assert!(config.is_idempotent());
assert_eq!(config.compression, ProducerCompression::Zstd);
assert_eq!(config.batch_size, 1);
assert_eq!(config.linger, Duration::from_millis(50));
assert_eq!(config.delivery_timeout, Duration::from_secs(10));
assert_eq!(config.request_timeout, Duration::from_secs(2));
assert_eq!(config.metadata_max_age, Duration::from_secs(60));
assert_eq!(config.retry_backoff, Duration::from_millis(75));
assert_eq!(config.max_retries, 7);
assert_eq!(config.transactional_id.as_deref(), Some("tx-a"));
assert_eq!(config.transaction_timeout, Duration::from_secs(45));
assert!(config.is_transactional());
}
#[test]
fn consumer_tls_builder_keeps_override_server_name() {
let config = ConsumerConfig::new("localhost:9093", "group-a")
.with_tls(TlsConfig::new().with_server_name("kafka.internal"));
assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
assert_eq!(config.tls.server_name.as_deref(), Some("kafka.internal"));
}
#[test]
fn consumer_multi_broker_with_bootstrap_servers() {
let config = ConsumerConfig::new("host1:9092", "group-a")
.with_bootstrap_servers(["host1:9092", "host2:9092"]);
assert_eq!(config.bootstrap_servers, vec!["host1:9092", "host2:9092"]);
}
#[test]
fn admin_tls_builder_records_client_auth_paths() {
let config = AdminConfig::new("localhost:9093")
.with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key");
assert_eq!(config.security_protocol, SecurityProtocol::Ssl);
assert_eq!(
config.tls.client_cert_path.as_deref(),
Some(std::path::Path::new("/tmp/client.crt"))
);
assert_eq!(
config.tls.client_key_path.as_deref(),
Some(std::path::Path::new("/tmp/client.key"))
);
}
#[test]
fn admin_and_consumer_builders_record_remaining_options() {
let admin = AdminConfig::new("localhost:9092")
.with_client_id("admin-a")
.with_bootstrap_servers(["host-a:9092", "host-b:9092"])
.with_security_protocol(SecurityProtocol::Plaintext)
.with_tls(TlsConfig::new().with_ca_cert_path("/tmp/ca.pem"))
.with_sasl_scram_sha_512("user-a", "secret-a")
.with_request_timeout(Duration::from_secs(8));
assert_eq!(admin.bootstrap_servers, vec!["host-a:9092", "host-b:9092"]);
assert_eq!(admin.client_id, "admin-a");
assert_eq!(admin.security_protocol, SecurityProtocol::SaslSsl);
assert_eq!(
admin.tls.ca_cert_path.as_deref(),
Some(std::path::Path::new("/tmp/ca.pem"))
);
assert_eq!(admin.sasl.mechanism, SaslMechanism::ScramSha512);
assert_eq!(admin.request_timeout, Duration::from_secs(8));
let consumer = ConsumerConfig::new("localhost:9092", "group-a")
.with_client_id("consumer-a")
.with_security_protocol(SecurityProtocol::Ssl)
.with_tls_ca_cert_path("/tmp/ca.pem")
.with_tls_client_auth_paths("/tmp/client.crt", "/tmp/client.key")
.with_sasl_scram_sha_512("user-b", "secret-b")
.with_request_timeout(Duration::from_secs(3))
.with_metadata_max_age(Duration::from_secs(4))
.with_retry_backoff(Duration::from_millis(5))
.with_max_retries(6)
.with_rebalance_timeout(Duration::from_secs(7))
.with_fetch_max_wait(Duration::from_millis(8))
.with_fetch_min_bytes(9)
.with_fetch_max_bytes(10)
.with_partition_max_bytes(11)
.with_auto_offset_reset(AutoOffsetReset::Latest)
.with_isolation_level(IsolationLevel::ReadCommitted)
.with_enable_auto_commit(true)
.with_auto_commit_interval(Duration::from_secs(12))
.with_server_assignor("range")
.with_rack_id("rack-a")
.with_instance_id("instance-a");
assert_eq!(consumer.client_id, "consumer-a");
assert_eq!(consumer.security_protocol, SecurityProtocol::SaslSsl);
assert_eq!(consumer.sasl.mechanism, SaslMechanism::ScramSha512);
assert_eq!(consumer.request_timeout, Duration::from_secs(3));
assert_eq!(consumer.metadata_max_age, Duration::from_secs(4));
assert_eq!(consumer.retry_backoff, Duration::from_millis(5));
assert_eq!(consumer.max_retries, 6);
assert_eq!(consumer.rebalance_timeout, Duration::from_secs(7));
assert_eq!(consumer.fetch_max_wait, Duration::from_millis(8));
assert_eq!(consumer.fetch_min_bytes, 9);
assert_eq!(consumer.fetch_max_bytes, 10);
assert_eq!(consumer.partition_max_bytes, 11);
assert_eq!(consumer.auto_offset_reset, AutoOffsetReset::Latest);
assert_eq!(consumer.isolation_level, IsolationLevel::ReadCommitted);
assert!(consumer.enable_auto_commit);
assert_eq!(consumer.auto_commit_interval, Duration::from_secs(12));
assert_eq!(consumer.server_assignor.as_deref(), Some("range"));
assert_eq!(consumer.rack_id.as_deref(), Some("rack-a"));
assert_eq!(consumer.instance_id.as_deref(), Some("instance-a"));
}
}