use std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use crate::admin::KafkaAdmin;
use crate::consumer::KafkaConsumer;
use crate::network::TcpConnector;
use crate::producer::KafkaProducer;
use crate::{
AdminConfig, AutoOffsetReset, ConsumerConfig, IsolationLevel, ProducerCompression,
ProducerConfig, Result, SaslConfig, SecurityProtocol, TlsConfig,
};
use crate::{ConsumerRebalanceEvent, ConsumerRebalanceListener};
const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Debug, Clone)]
pub struct KafkaClient {
bootstrap_servers: Vec<String>,
}
impl KafkaClient {
pub fn new(bootstrap_server: impl Into<String>) -> Self {
Self {
bootstrap_servers: vec![bootstrap_server.into()],
}
}
pub fn with_bootstrap_servers(
mut self,
servers: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
self.bootstrap_servers = servers.into_iter().map(Into::into).collect();
self
}
pub fn topic(&self, topic: impl Into<String>) -> KafkaTopic {
KafkaTopic {
bootstrap_servers: self.bootstrap_servers.clone(),
topic: topic.into(),
}
}
pub fn topics<I, S>(&self, topics: I) -> KafkaTopics
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
KafkaTopics {
bootstrap_servers: self.bootstrap_servers.clone(),
topics: topics.into_iter().map(Into::into).collect(),
}
}
pub fn producer(&self) -> ProducerBuilder {
ProducerBuilder::from_servers(self.bootstrap_servers.clone())
}
pub fn admin(&self) -> AdminBuilder {
AdminBuilder::from_servers(self.bootstrap_servers.clone())
}
pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
}
}
#[derive(Debug, Clone)]
pub struct KafkaTopic {
bootstrap_servers: Vec<String>,
topic: String,
}
impl KafkaTopic {
pub fn producer(&self) -> ProducerBuilder {
ProducerBuilder::from_servers(self.bootstrap_servers.clone())
.with_default_topic(self.topic.clone())
}
pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
.with_topic(self.topic.clone())
}
}
#[derive(Debug, Clone)]
pub struct KafkaTopics {
bootstrap_servers: Vec<String>,
topics: Vec<String>,
}
impl KafkaTopics {
pub fn consumer(&self, group_id: impl Into<String>) -> ConsumerBuilder {
ConsumerBuilder::from_servers(self.bootstrap_servers.clone(), group_id)
.with_topics(self.topics.clone())
}
}
#[derive(Debug, Clone)]
pub struct ProducerBuilder {
config: ProducerConfig,
default_topic: Option<String>,
default_partition: Option<i32>,
}
impl ProducerBuilder {
fn new(bootstrap_server: impl Into<String>) -> Self {
Self {
config: ProducerConfig::new(bootstrap_server),
default_topic: None,
default_partition: None,
}
}
fn from_servers(servers: Vec<String>) -> Self {
let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
builder.config = builder.config.with_bootstrap_servers(servers);
builder
}
pub fn with_bootstrap_servers(
mut self,
servers: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
self.config = self.config.with_bootstrap_servers(servers);
self
}
pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
self.config = self.config.with_client_id(client_id);
self
}
pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
self.config = self.config.with_security_protocol(security_protocol);
self
}
pub fn with_tls(mut self, tls: TlsConfig) -> Self {
self.config = self.config.with_tls(tls);
self
}
pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
self.config = self.config.with_sasl(sasl);
self
}
pub fn with_sasl_plain(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.config = self.config.with_sasl_plain(username, password);
self
}
pub fn with_sasl_scram_sha_256(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.config = self.config.with_sasl_scram_sha_256(username, password);
self
}
pub fn with_sasl_scram_sha_512(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.config = self.config.with_sasl_scram_sha_512(username, password);
self
}
pub fn with_compression(mut self, compression: ProducerCompression) -> Self {
self.config = self.config.with_compression(compression);
self
}
pub fn with_default_topic(mut self, topic: impl Into<String>) -> Self {
self.default_topic = Some(topic.into());
self
}
pub fn with_default_partition(mut self, partition: i32) -> Self {
self.default_partition = Some(partition);
self
}
pub fn with_enable_idempotence(mut self, enable_idempotence: bool) -> Self {
self.config = self.config.with_enable_idempotence(enable_idempotence);
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.config = self.config.with_batch_size(batch_size);
self
}
pub fn with_linger(mut self, linger: Duration) -> Self {
self.config = self.config.with_linger(linger);
self
}
pub fn with_delivery_timeout(mut self, delivery_timeout: Duration) -> Self {
self.config = self.config.with_delivery_timeout(delivery_timeout);
self
}
pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
self.config = self.config.with_request_timeout(request_timeout);
self
}
pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
self.config = self.config.with_retry_backoff(retry_backoff);
self
}
pub fn with_max_retries(mut self, max_retries: usize) -> Self {
self.config = self.config.with_max_retries(max_retries);
self
}
pub fn with_max_in_flight_requests_per_connection(mut self, max_in_flight: usize) -> Self {
self.config = self
.config
.with_max_in_flight_requests_per_connection(max_in_flight);
self
}
pub fn with_transactional_id(mut self, transactional_id: impl Into<String>) -> Self {
self.config = self.config.with_transactional_id(transactional_id);
self
}
pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
self.config = self.config.with_tcp_connector(tcp_connector);
self
}
pub async fn connect(self) -> Result<KafkaProducer> {
let producer = KafkaProducer::connect(self.config).await?;
Ok(producer.with_defaults(self.default_topic, self.default_partition))
}
}
#[derive(Debug, Clone)]
pub struct AdminBuilder {
config: AdminConfig,
}
impl AdminBuilder {
fn new(bootstrap_server: impl Into<String>) -> Self {
Self {
config: AdminConfig::new(bootstrap_server),
}
}
fn from_servers(servers: Vec<String>) -> Self {
let mut builder = Self::new(servers.first().cloned().unwrap_or_default());
builder.config = builder.config.with_bootstrap_servers(servers);
builder
}
pub fn with_bootstrap_servers(
mut self,
servers: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
self.config = self.config.with_bootstrap_servers(servers);
self
}
pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
self.config = self.config.with_client_id(client_id);
self
}
pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
self.config = self.config.with_request_timeout(request_timeout);
self
}
pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
self.config = self.config.with_security_protocol(security_protocol);
self
}
pub fn with_tls(mut self, tls: TlsConfig) -> Self {
self.config = self.config.with_tls(tls);
self
}
pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
self.config = self.config.with_sasl(sasl);
self
}
pub fn with_sasl_plain(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.config = self.config.with_sasl_plain(username, password);
self
}
pub fn with_sasl_scram_sha_256(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.config = self.config.with_sasl_scram_sha_256(username, password);
self
}
pub fn with_sasl_scram_sha_512(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.config = self.config.with_sasl_scram_sha_512(username, password);
self
}
pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
self.config = self.config.with_tcp_connector(tcp_connector);
self
}
pub async fn connect(self) -> Result<KafkaAdmin> {
KafkaAdmin::connect(self.config).await
}
}
#[derive(Debug, Clone)]
pub struct ConsumerBuilder {
config: ConsumerConfig,
topics: Vec<String>,
poll_timeout: Duration,
}
impl ConsumerBuilder {
fn new(bootstrap_server: impl Into<String>, group_id: impl Into<String>) -> Self {
Self {
config: ConsumerConfig::new(bootstrap_server, group_id),
topics: Vec::new(),
poll_timeout: DEFAULT_POLL_TIMEOUT,
}
}
fn from_servers(servers: Vec<String>, group_id: impl Into<String>) -> Self {
let first = servers.first().cloned().unwrap_or_default();
let mut builder = Self::new(first, group_id);
builder.config = builder.config.with_bootstrap_servers(servers);
builder
}
pub fn with_bootstrap_servers(
mut self,
servers: impl IntoIterator<Item = impl Into<String>>,
) -> Self {
self.config = self.config.with_bootstrap_servers(servers);
self
}
pub fn with_client_id(mut self, client_id: impl Into<String>) -> Self {
self.config = self.config.with_client_id(client_id);
self
}
pub fn with_security_protocol(mut self, security_protocol: SecurityProtocol) -> Self {
self.config = self.config.with_security_protocol(security_protocol);
self
}
pub fn with_tls(mut self, tls: TlsConfig) -> Self {
self.config = self.config.with_tls(tls);
self
}
pub fn with_sasl(mut self, sasl: SaslConfig) -> Self {
self.config = self.config.with_sasl(sasl);
self
}
pub fn with_sasl_plain(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.config = self.config.with_sasl_plain(username, password);
self
}
pub fn with_sasl_scram_sha_256(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.config = self.config.with_sasl_scram_sha_256(username, password);
self
}
pub fn with_sasl_scram_sha_512(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.config = self.config.with_sasl_scram_sha_512(username, password);
self
}
pub fn with_topic(mut self, topic: impl Into<String>) -> Self {
self.topics.push(topic.into());
self
}
pub fn with_topics<I, S>(mut self, topics: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.topics.extend(topics.into_iter().map(Into::into));
self
}
pub fn with_auto_offset_reset(mut self, auto_offset_reset: AutoOffsetReset) -> Self {
self.config = self.config.with_auto_offset_reset(auto_offset_reset);
self
}
pub fn with_isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
self.config = self.config.with_isolation_level(isolation_level);
self
}
pub fn with_auto_commit(mut self, enable_auto_commit: bool) -> Self {
self.config = self.config.with_enable_auto_commit(enable_auto_commit);
self
}
pub fn with_request_timeout(mut self, request_timeout: Duration) -> Self {
self.config = self.config.with_request_timeout(request_timeout);
self
}
pub fn with_retry_backoff(mut self, retry_backoff: Duration) -> Self {
self.config = self.config.with_retry_backoff(retry_backoff);
self
}
pub fn with_max_retries(mut self, max_retries: usize) -> Self {
self.config = self.config.with_max_retries(max_retries);
self
}
pub fn with_instance_id(mut self, instance_id: impl Into<String>) -> Self {
self.config = self.config.with_instance_id(instance_id);
self
}
pub fn with_rebalance_listener(mut self, listener: ConsumerRebalanceListener) -> Self {
self.config = self.config.with_rebalance_listener(listener);
self
}
pub fn with_rebalance_callback(
mut self,
callback: impl Fn(ConsumerRebalanceEvent) + Send + Sync + 'static,
) -> Self {
self.config = self.config.with_rebalance_callback(callback);
self
}
pub fn with_tcp_connector(mut self, tcp_connector: Arc<dyn TcpConnector>) -> Self {
self.config = self.config.with_tcp_connector(tcp_connector);
self
}
pub fn with_poll_timeout(mut self, poll_timeout: Duration) -> Self {
self.poll_timeout = poll_timeout;
self
}
pub async fn connect(self) -> Result<KafkaConsumer> {
let topics = self
.topics
.into_iter()
.map(validate_topic_name)
.collect::<Result<Vec<_>>>()?;
let consumer = KafkaConsumer::connect(self.config).await?;
if !topics.is_empty()
&& let Err(error) = consumer.subscribe(topics).await
{
let _ = consumer.shutdown().await;
return Err(error);
}
Ok(consumer.with_default_poll_timeout(self.poll_timeout))
}
}
fn validate_topic_name(topic: String) -> Result<String> {
let topic = topic.trim();
if topic.is_empty() {
return Err(anyhow!("topic must be non-empty").into());
}
Ok(topic.to_owned())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::SaslMechanism;
#[test]
fn client_builders_preserve_bootstrap_servers_and_topic_defaults() {
let client =
KafkaClient::new("host-a:9092").with_bootstrap_servers(["host-a:9092", "host-b:9092"]);
let producer = client
.topic("orders")
.producer()
.with_client_id("producer-a")
.with_default_partition(2)
.with_compression(ProducerCompression::Lz4)
.with_batch_size(32)
.with_linger(Duration::from_millis(10))
.with_delivery_timeout(Duration::from_secs(3))
.with_transactional_id("tx-a");
assert_eq!(
producer.config.bootstrap_servers,
vec!["host-a:9092", "host-b:9092"]
);
assert_eq!(producer.config.client_id, "producer-a");
assert_eq!(producer.config.compression, ProducerCompression::Lz4);
assert_eq!(producer.config.batch_size, 32);
assert_eq!(producer.config.linger, Duration::from_millis(10));
assert_eq!(producer.config.delivery_timeout, Duration::from_secs(3));
assert_eq!(producer.config.transactional_id.as_deref(), Some("tx-a"));
assert_eq!(producer.default_topic.as_deref(), Some("orders"));
assert_eq!(producer.default_partition, Some(2));
}
#[test]
fn admin_builder_forwards_security_and_timeout_options() {
let builder = KafkaClient::new("host-a:9092")
.admin()
.with_bootstrap_servers(["host-b:9092", "host-c:9092"])
.with_client_id("admin-a")
.with_request_timeout(Duration::from_secs(9))
.with_security_protocol(SecurityProtocol::Ssl)
.with_tls(TlsConfig::new().with_server_name("kafka.internal"))
.with_sasl_scram_sha_512("user-a", "secret-a");
assert_eq!(
builder.config.bootstrap_servers,
vec!["host-b:9092", "host-c:9092"]
);
assert_eq!(builder.config.client_id, "admin-a");
assert_eq!(builder.config.request_timeout, Duration::from_secs(9));
assert_eq!(builder.config.security_protocol, SecurityProtocol::SaslSsl);
assert_eq!(
builder.config.tls.server_name.as_deref(),
Some("kafka.internal")
);
assert_eq!(builder.config.sasl.mechanism, SaslMechanism::ScramSha512);
}
#[test]
fn consumer_builder_collects_topics_and_group_options() {
let builder = KafkaClient::new("host-a:9092")
.topic("orders")
.consumer("group-a")
.with_bootstrap_servers(["host-b:9092"])
.with_client_id("consumer-a")
.with_topic("payments")
.with_topics(["shipments", "invoices"])
.with_auto_offset_reset(AutoOffsetReset::Latest)
.with_isolation_level(IsolationLevel::ReadCommitted)
.with_auto_commit(true)
.with_instance_id("instance-a")
.with_poll_timeout(Duration::from_millis(250))
.with_sasl_plain("user-a", "secret-a");
assert_eq!(builder.config.bootstrap_servers, vec!["host-b:9092"]);
assert_eq!(builder.config.client_id, "consumer-a");
assert_eq!(builder.config.group_id, "group-a");
assert_eq!(
builder.topics,
vec!["orders", "payments", "shipments", "invoices"]
);
assert_eq!(builder.config.auto_offset_reset, AutoOffsetReset::Latest);
assert_eq!(
builder.config.isolation_level,
IsolationLevel::ReadCommitted
);
assert!(builder.config.enable_auto_commit);
assert_eq!(builder.config.instance_id.as_deref(), Some("instance-a"));
assert_eq!(builder.poll_timeout, Duration::from_millis(250));
assert_eq!(
builder.config.security_protocol,
SecurityProtocol::SaslPlaintext
);
}
#[test]
fn topics_facade_collects_multi_topic_consumer_defaults() {
let builder = KafkaClient::new("host-a:9092")
.with_bootstrap_servers(["host-a:9092", "host-b:9092"])
.topics(["orders", "payments"])
.consumer("group-a")
.with_topic("shipments");
assert_eq!(
builder.config.bootstrap_servers,
vec!["host-a:9092", "host-b:9092"]
);
assert_eq!(builder.config.group_id, "group-a");
assert_eq!(builder.topics, vec!["orders", "payments", "shipments"]);
}
#[test]
fn validate_topic_name_trims_and_rejects_empty_names() {
assert_eq!(
validate_topic_name(" topic-a ".to_owned()).unwrap(),
"topic-a"
);
assert!(validate_topic_name(" ".to_owned()).is_err());
}
}