mod admin;
mod config;
mod metrics;
mod producer;
mod token;
pub mod topic_resolver;
pub use admin::{KafkaAdmin, TopicInfo};
#[allow(deprecated)]
pub use config::{
DEVTEST_PROFILE, HIGH_THROUGHPUT_CONSUMER_DEFAULTS, KafkaConfig, KafkaProfile,
LOW_LATENCY_CONSUMER_DEFAULTS, PRODUCER_DEFAULTS, PRODUCER_DEVTEST, PRODUCER_EXACTLY_ONCE,
PRODUCER_HIGH_THROUGHPUT, PRODUCER_LOW_LATENCY, PRODUCTION_PROFILE, SuppressionRule,
merge_with_overrides,
};
pub use metrics::{
BrokerMetrics, KafkaMetrics, StatsContext, healthy_broker_count, total_consumer_lag,
};
pub use producer::{KafkaProducer, ProducerMetrics, ProducerProfile};
pub use token::KafkaToken;
pub use topic_resolver::{TopicRefreshHandle, TopicResolver};
use super::error::{TransportError, TransportResult};
use super::traits::{TransportBase, TransportReceiver, TransportSender};
use super::types::{Message, PayloadFormat, SendResult};
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer};
use rdkafka::message::Message as KafkaMessage;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::topic_partition_list::{Offset, TopicPartitionList};
use rdkafka::util::Timeout;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
pub mod tuning {
pub const DEFAULT_BATCH_SIZE: usize = 10_000;
pub const MAX_DRAIN_MS: u64 = 100;
pub const POLL_TIMEOUT_MS: u64 = 50;
pub const INITIAL_BATCH_CAPACITY: usize = 10_000;
}
pub struct KafkaTransport {
consumer: BaseConsumer<StatsContext>,
producer: FutureProducer<StatsContext>,
topic_cache: HashMap<String, Arc<str>>,
closed: AtomicBool,
healthy: Arc<AtomicBool>,
subscribed_topics: parking_lot::RwLock<Vec<String>>,
shutdown_token: tokio_util::sync::CancellationToken,
topic_refresh: Option<parking_lot::Mutex<TopicRefreshHandle>>,
filter_engine: super::filter::TransportFilterEngine,
filtered_dlq_buffer: parking_lot::Mutex<Vec<super::filter::FilteredDlqEntry>>,
}
impl KafkaTransport {
pub async fn new(config: &KafkaConfig) -> TransportResult<Self> {
let mut client_config = ClientConfig::new();
client_config.set("bootstrap.servers", config.brokers.join(","));
client_config.set("group.id", &config.group);
client_config.set("enable.auto.commit", config.enable_auto_commit.to_string());
client_config.set(
"auto.commit.interval.ms",
config.auto_commit_interval_ms.to_string(),
);
client_config.set("session.timeout.ms", config.session_timeout_ms.to_string());
client_config.set(
"heartbeat.interval.ms",
config.heartbeat_interval_ms.to_string(),
);
client_config.set(
"max.poll.interval.ms",
config.max_poll_interval_ms.to_string(),
);
client_config.set("fetch.min.bytes", config.fetch_min_bytes.to_string());
client_config.set("fetch.max.bytes", config.fetch_max_bytes.to_string());
client_config.set(
"max.partition.fetch.bytes",
config.max_partition_fetch_bytes.to_string(),
);
client_config.set("auto.offset.reset", &config.auto_offset_reset);
client_config.set(
"enable.partition.eof",
config.enable_partition_eof.to_string(),
);
let rdkafka_config = config.build_librdkafka_config();
for (key, value) in &rdkafka_config {
client_config.set(key, value);
}
client_config.set("security.protocol", &config.security_protocol);
if let Some(ref mechanism) = config.sasl_mechanism {
client_config.set("sasl.mechanism", mechanism);
}
if let Some(ref username) = config.sasl_username {
client_config.set("sasl.username", username);
}
if let Some(ref password) = config.sasl_password {
client_config.set("sasl.password", password.expose());
}
if let Some(ref ca) = config.ssl_ca_location {
client_config.set("ssl.ca.location", ca);
}
if let Some(ref cert) = config.ssl_certificate_location {
client_config.set("ssl.certificate.location", cert);
}
if let Some(ref key) = config.ssl_key_location {
client_config.set("ssl.key.location", key);
}
if config.ssl_skip_verify {
client_config.set("enable.ssl.certificate.verification", "false");
}
client_config.set("client.id", &config.client_id);
if client_config.get("statistics.interval.ms").is_none() {
client_config.set("statistics.interval.ms", "5000");
}
let consumer: BaseConsumer<StatsContext> = client_config
.create_with_context(StatsContext::new())
.map_err(|e| TransportError::Connection(format!("Failed to create consumer: {e}")))?;
let (effective_topics, topic_refresh, shutdown_token) =
if config.topics.is_empty() && config.auto_discover {
tracing::info!("Topics empty — auto-discovering from broker");
let resolver = topic_resolver::TopicResolver::new(config)?;
let discovered = resolver.resolve()?;
if discovered.is_empty() {
return Err(TransportError::Config(
"Auto-discovery found no matching topics".into(),
));
}
let token = tokio_util::sync::CancellationToken::new();
let refresh = if config.topic_refresh_secs > 0 {
let refresh_resolver = topic_resolver::TopicResolver::new(config)?;
let handle = refresh_resolver.start_refresh_loop(
Duration::from_secs(config.topic_refresh_secs),
token.clone(),
);
tracing::info!(
interval_secs = config.topic_refresh_secs,
"Started periodic topic refresh"
);
Some(parking_lot::Mutex::new(handle))
} else {
None
};
(discovered, refresh, token)
} else {
(
config.topics.clone(),
None,
tokio_util::sync::CancellationToken::new(),
)
};
let subscribed_topics = effective_topics;
if !subscribed_topics.is_empty() {
let topics: Vec<&str> = subscribed_topics.iter().map(String::as_str).collect();
consumer
.subscribe(&topics)
.map_err(|e| TransportError::Connection(format!("Failed to subscribe: {e}")))?;
}
let mut topic_cache = HashMap::with_capacity(subscribed_topics.len());
for topic in &subscribed_topics {
topic_cache.insert(topic.clone(), Arc::from(topic.as_str()));
}
let producer: FutureProducer<StatsContext> = client_config
.create_with_context(StatsContext::new())
.map_err(|e| TransportError::Connection(format!("Failed to create producer: {e}")))?;
let healthy = Arc::new(AtomicBool::new(true));
let filter_engine = super::filter::TransportFilterEngine::new(
&config.filters_in,
&config.filters_out,
&crate::transport::filter::TransportFilterTierConfig::default(),
)?;
#[cfg(feature = "health")]
{
let h = Arc::clone(&healthy);
crate::health::HealthRegistry::register("transport:kafka", move || {
if h.load(Ordering::Relaxed) {
crate::health::HealthStatus::Healthy
} else {
crate::health::HealthStatus::Unhealthy
}
});
}
Ok(Self {
consumer,
producer,
topic_cache,
closed: AtomicBool::new(false),
healthy,
subscribed_topics: parking_lot::RwLock::new(subscribed_topics),
shutdown_token,
topic_refresh,
filter_engine,
filtered_dlq_buffer: parking_lot::Mutex::new(Vec::new()),
})
}
#[must_use]
pub fn stats(&self) -> KafkaMetrics {
self.consumer.context().get_metrics()
}
}
impl TransportBase for KafkaTransport {
async fn close(&self) -> TransportResult<()> {
self.closed.store(true, Ordering::Relaxed);
self.healthy.store(false, Ordering::Relaxed);
self.shutdown_token.cancel();
Ok(())
}
fn is_healthy(&self) -> bool {
self.healthy.load(Ordering::Relaxed)
}
fn name(&self) -> &'static str {
"kafka"
}
}
impl TransportSender for KafkaTransport {
async fn send(&self, key: &str, payload: &[u8]) -> SendResult {
if self.closed.load(Ordering::Relaxed) {
return SendResult::Fatal(TransportError::Closed);
}
if self.filter_engine.has_outbound_filters() {
match self.filter_engine.apply_outbound(payload) {
super::filter::FilterDisposition::Pass => {}
super::filter::FilterDisposition::Drop => return SendResult::Ok,
super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
}
}
let record: FutureRecord<'_, str, [u8]> = FutureRecord::to(key).payload(payload);
#[cfg(feature = "otel")]
let record = if let Some(tp) = super::propagation::current_traceparent() {
let headers = rdkafka::message::OwnedHeaders::new().insert(rdkafka::message::Header {
key: super::propagation::TRACEPARENT_HEADER,
value: Some(tp.as_str()),
});
record.headers(headers)
} else {
record
};
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();
let result = match self
.producer
.send(record, Timeout::After(Duration::from_secs(5)))
.await
{
Ok(_) => {
#[cfg(feature = "metrics")]
::metrics::counter!("dfe_transport_sent_total", "transport" => "kafka")
.increment(1);
SendResult::Ok
}
Err((err, _)) => {
let err_str = err.to_string();
if err_str.contains("queue full") || err_str.contains("Local: Queue full") {
#[cfg(feature = "metrics")]
::metrics::counter!(
"dfe_transport_backpressured_total",
"transport" => "kafka"
)
.increment(1);
SendResult::Backpressured
} else {
#[cfg(feature = "metrics")]
::metrics::counter!(
"dfe_transport_send_errors_total",
"transport" => "kafka"
)
.increment(1);
SendResult::Fatal(TransportError::Send(err_str))
}
}
};
#[cfg(feature = "metrics")]
::metrics::histogram!(
"dfe_transport_send_duration_seconds",
"transport" => "kafka"
)
.record(start.elapsed().as_secs_f64());
result
}
}
impl TransportReceiver for KafkaTransport {
type Token = KafkaToken;
async fn recv(&self, max: usize) -> TransportResult<Vec<Message<Self::Token>>> {
if self.closed.load(Ordering::Relaxed) {
return Err(TransportError::Closed);
}
if let Some(ref refresh) = self.topic_refresh
&& let Some(new_topics) = refresh.lock().check_changed()
{
let topics: Vec<&str> = new_topics.iter().map(String::as_str).collect();
match self.consumer.subscribe(&topics) {
Ok(()) => {
tracing::info!(?new_topics, "Re-subscribed after topic refresh");
*self.subscribed_topics.write() = new_topics;
}
Err(e) => {
tracing::warn!(error = %e, "Failed to re-subscribe after topic refresh");
}
}
}
let timeout = Duration::from_millis(tuning::POLL_TIMEOUT_MS);
let max_msgs = max;
let mut local_cache = self.topic_cache.clone();
let mut messages = Vec::with_capacity(max_msgs.min(tuning::INITIAL_BATCH_CAPACITY));
let drain_deadline =
std::time::Instant::now() + Duration::from_millis(tuning::MAX_DRAIN_MS);
if let Some(result) = self.consumer.poll(timeout) {
match result {
Ok(msg) => {
#[cfg(feature = "otel")]
if let Some(headers) = msg.headers() {
use rdkafka::message::Headers;
for idx in 0..headers.count() {
if let Some(Ok(header)) = headers.try_get_as::<[u8]>(idx)
&& header.key == super::propagation::TRACEPARENT_HEADER
{
if let Some(value) = header.value
&& let Ok(tp) = std::str::from_utf8(value)
&& super::propagation::is_valid_traceparent(tp)
{
tracing::Span::current().record("traceparent", tp);
}
break;
}
}
}
let topic_str = msg.topic();
let topic: Arc<str> = get_or_insert_topic(&mut local_cache, topic_str);
let payload = msg.payload().map_or_else(Vec::new, |p| p.to_vec());
let partition = msg.partition();
let offset = msg.offset();
let timestamp_ms = msg.timestamp().to_millis();
messages.push(Message {
key: Some(topic.clone()),
payload,
token: KafkaToken::new(topic, partition, offset),
timestamp_ms,
format: PayloadFormat::Auto,
});
}
Err(e) => {
return Err(TransportError::Recv(e.to_string()));
}
}
} else {
return Ok(messages);
}
while messages.len() < max_msgs {
if std::time::Instant::now() >= drain_deadline {
break;
}
match self.consumer.poll(Duration::ZERO) {
Some(Ok(msg)) => {
let topic_str = msg.topic();
let topic: Arc<str> = get_or_insert_topic(&mut local_cache, topic_str);
let payload = msg.payload().map_or_else(Vec::new, |p| p.to_vec());
let partition = msg.partition();
let offset = msg.offset();
let timestamp_ms = msg.timestamp().to_millis();
messages.push(Message {
key: Some(topic.clone()),
payload,
token: KafkaToken::new(topic, partition, offset),
timestamp_ms,
format: PayloadFormat::Auto,
});
}
Some(Err(e)) => {
if messages.is_empty() {
return Err(TransportError::Recv(e.to_string()));
}
break;
}
None => break,
}
}
if self.filter_engine.has_inbound_filters() {
let mut staged_dlq: Vec<super::filter::FilteredDlqEntry> = Vec::new();
messages.retain(|msg| match self.filter_engine.apply_inbound(&msg.payload) {
super::filter::FilterDisposition::Pass => true,
super::filter::FilterDisposition::Drop => false,
super::filter::FilterDisposition::Dlq => {
staged_dlq.push(super::filter::FilteredDlqEntry {
payload: msg.payload.clone(),
key: msg.key.clone(),
reason: "transport filter".to_string(),
});
false
}
});
if !staged_dlq.is_empty() {
self.filtered_dlq_buffer.lock().extend(staged_dlq);
}
}
Ok(messages)
}
fn take_filtered_dlq_entries(&self) -> Vec<super::filter::FilteredDlqEntry> {
std::mem::take(&mut *self.filtered_dlq_buffer.lock())
}
async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
if tokens.is_empty() {
return Ok(());
}
let mut tpl = TopicPartitionList::new();
let mut partition_offsets: HashMap<(&str, i32), i64> =
HashMap::with_capacity(tokens.len() / 100);
for token in tokens {
let key = (token.topic.as_ref(), token.partition);
partition_offsets
.entry(key)
.and_modify(|current| {
if token.offset > *current {
*current = token.offset;
}
})
.or_insert(token.offset);
}
for ((topic, partition), offset) in partition_offsets {
tpl.add_partition_offset(topic, partition, Offset::Offset(offset + 1))
.map_err(|e| TransportError::Commit(format!("Failed to build TPL: {e}")))?;
}
self.consumer
.commit(&tpl, CommitMode::Async)
.map_err(|e| TransportError::Commit(e.to_string()))?;
Ok(())
}
}
#[inline]
fn get_or_insert_topic(cache: &mut HashMap<String, Arc<str>>, topic: &str) -> Arc<str> {
if let Some(arc) = cache.get(topic) {
return arc.clone();
}
let arc: Arc<str> = Arc::from(topic);
cache.insert(topic.to_string(), arc.clone());
arc
}
impl std::fmt::Debug for KafkaTransport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KafkaTransport")
.field("subscribed_topics", &*self.subscribed_topics.read())
.field("closed", &self.closed.load(Ordering::Relaxed))
.field("healthy", &self.healthy.load(Ordering::Relaxed))
.field("topic_refresh_active", &self.topic_refresh.is_some())
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tuning_constants() {
assert_eq!(tuning::DEFAULT_BATCH_SIZE, 10_000);
assert_eq!(tuning::MAX_DRAIN_MS, 100);
assert_eq!(tuning::POLL_TIMEOUT_MS, 50);
}
#[test]
fn test_get_or_insert_topic_cached() {
let mut cache = HashMap::new();
cache.insert("events".to_string(), Arc::from("events"));
let arc1 = get_or_insert_topic(&mut cache, "events");
let arc2 = get_or_insert_topic(&mut cache, "events");
assert!(Arc::ptr_eq(&arc1, &arc2));
}
#[test]
fn test_get_or_insert_topic_new() {
let mut cache = HashMap::new();
let arc = get_or_insert_topic(&mut cache, "new-topic");
assert_eq!(&*arc, "new-topic");
assert!(cache.contains_key("new-topic"));
}
#[test]
fn test_kafka_config_defaults() {
let config = KafkaConfig::default();
assert_eq!(config.fetch_max_bytes, 52_428_800); assert!(!config.enable_auto_commit); }
#[tokio::test]
async fn test_topic_refresh_check_changed_detects_updates() {
let (tx, rx) = tokio::sync::watch::channel(vec!["events_load".to_string()]);
let mut handle = topic_resolver::TopicRefreshHandle::new_for_test(rx);
assert!(handle.check_changed().is_none());
tx.send(vec!["events_load".to_string(), "logs_load".to_string()])
.unwrap();
let changed = handle.check_changed();
assert!(changed.is_some());
let topics = changed.unwrap();
assert_eq!(topics.len(), 2);
assert!(topics.contains(&"logs_load".to_string()));
assert!(handle.check_changed().is_none());
}
#[test]
fn test_subscribed_topics_rwlock_update() {
let topics = parking_lot::RwLock::new(vec!["events_load".to_string()]);
assert_eq!(topics.read().len(), 1);
*topics.write() = vec!["events_load".to_string(), "logs_load".to_string()];
assert_eq!(topics.read().len(), 2);
assert_eq!(topics.read()[1], "logs_load");
}
}