mod admin;
mod config;
mod metrics;
mod producer;
mod token;
pub mod topic_resolver;
pub use admin::{KafkaAdmin, TopicInfo};
#[allow(deprecated)]
pub use config::{
ConsumerKnobs, DEVTEST_PROFILE, HIGH_THROUGHPUT_CONSUMER_DEFAULTS, KafkaConfig, KafkaProfile,
KafkaSizingConfig, LOW_LATENCY_CONSUMER_DEFAULTS, PRODUCER_DEFAULTS, PRODUCER_DEVTEST,
PRODUCER_EXACTLY_ONCE, PRODUCER_HIGH_THROUGHPUT, PRODUCER_LOW_LATENCY, PRODUCTION_PROFILE,
ProducerKnobs, SelfRegulationProfile, SuppressionRule, merge_with_overrides,
};
pub use metrics::{
BrokerMetrics, KafkaMetrics, StatsContext, healthy_broker_count, total_consumer_lag,
};
pub use producer::{KafkaProducer, ProducerMetrics, ProducerProfile};
pub use token::KafkaToken;
pub use topic_resolver::{TopicRefreshHandle, TopicResolver};
use super::error::{TransportError, TransportResult};
use super::traits::{RecvBatch, TransportBase, TransportReceiver, TransportSender};
use super::types::{Message, PayloadFormat, SendResult};
use super::work_batch::WorkBatch;
use rdkafka::config::ClientConfig;
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer};
use rdkafka::message::Message as KafkaMessage;
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::topic_partition_list::{Offset, TopicPartitionList};
use rdkafka::util::Timeout;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
pub mod tuning {
pub const DEFAULT_BATCH_SIZE: usize = 10_000;
pub const MAX_DRAIN_MS: u64 = 100;
pub const POLL_TIMEOUT_MS: u64 = 50;
pub const INITIAL_BATCH_CAPACITY: usize = 10_000;
}
pub struct KafkaTransport {
consumer: Arc<BaseConsumer<StatsContext>>,
producer: FutureProducer<StatsContext>,
topic_cache: parking_lot::RwLock<HashMap<String, Arc<str>>>,
closed: AtomicBool,
healthy: Arc<AtomicBool>,
subscribed_topics: parking_lot::RwLock<Vec<String>>,
shutdown_token: tokio_util::sync::CancellationToken,
topic_refresh: Option<parking_lot::Mutex<TopicRefreshHandle>>,
filter_engine: super::filter::TransportFilterEngine,
#[cfg(feature = "governor")]
inbound_gate: Option<crate::governor::InboundGate>,
#[cfg(feature = "governor")]
partition_limited_warn: PartitionLimitedDiagnostic,
}
impl KafkaTransport {
#[allow(clippy::too_many_lines)]
pub async fn new(config: &KafkaConfig) -> TransportResult<Self> {
config
.validate(crate::env::is_production())
.map_err(TransportError::Config)?;
let mut client_config = ClientConfig::new();
client_config.set("bootstrap.servers", config.brokers.join(","));
client_config.set("group.id", &config.group);
client_config.set("enable.auto.commit", config.enable_auto_commit.to_string());
client_config.set(
"auto.commit.interval.ms",
config.auto_commit_interval_ms.to_string(),
);
client_config.set("session.timeout.ms", config.session_timeout_ms.to_string());
client_config.set(
"heartbeat.interval.ms",
config.heartbeat_interval_ms.to_string(),
);
client_config.set(
"max.poll.interval.ms",
config.max_poll_interval_ms.to_string(),
);
client_config.set("fetch.min.bytes", config.fetch_min_bytes.to_string());
client_config.set("fetch.max.bytes", config.fetch_max_bytes.to_string());
client_config.set(
"max.partition.fetch.bytes",
config.max_partition_fetch_bytes.to_string(),
);
client_config.set("auto.offset.reset", &config.auto_offset_reset);
client_config.set(
"enable.partition.eof",
config.enable_partition_eof.to_string(),
);
let rdkafka_config = config.build_librdkafka_config();
for (key, value) in &rdkafka_config {
client_config.set(key, value);
}
for (key, value) in config.sizing.resolved_consumer_map() {
client_config.set(key, value);
}
client_config.set("security.protocol", &config.security_protocol);
if let Some(ref mechanism) = config.sasl_mechanism {
client_config.set("sasl.mechanism", mechanism);
}
if let Some(ref username) = config.sasl_username {
client_config.set("sasl.username", username);
}
if let Some(ref password) = config.sasl_password {
client_config.set("sasl.password", password.expose());
}
if let Some(ref ca) = config.ssl_ca_location {
client_config.set("ssl.ca.location", ca);
}
if let Some(ref cert) = config.ssl_certificate_location {
client_config.set("ssl.certificate.location", cert);
}
if let Some(ref key) = config.ssl_key_location {
client_config.set("ssl.key.location", key);
}
if config.ssl_skip_verify {
client_config.set("enable.ssl.certificate.verification", "false");
}
client_config.set("client.id", &config.client_id);
if client_config.get("statistics.interval.ms").is_none() {
client_config.set("statistics.interval.ms", "5000");
}
let consumer: BaseConsumer<StatsContext> = client_config
.create_with_context(StatsContext::new())
.map_err(|e| TransportError::Connection(format!("Failed to create consumer: {e}")))?;
let consumer = Arc::new(consumer);
let (effective_topics, topic_refresh, shutdown_token) =
if config.topics.is_empty() && config.auto_discover {
tracing::info!("Topics empty -- auto-discovering from broker");
let resolver = topic_resolver::TopicResolver::new(config)?;
let discovered = resolver.resolve()?;
if discovered.is_empty() {
return Err(TransportError::Config(
"Auto-discovery found no matching topics".into(),
));
}
let token = tokio_util::sync::CancellationToken::new();
let refresh = if config.topic_refresh_secs > 0 {
let refresh_resolver = topic_resolver::TopicResolver::new(config)?;
let handle = refresh_resolver.start_refresh_loop(
Duration::from_secs(config.topic_refresh_secs),
token.clone(),
);
tracing::info!(
interval_secs = config.topic_refresh_secs,
"Started periodic topic refresh"
);
Some(parking_lot::Mutex::new(handle))
} else {
None
};
(discovered, refresh, token)
} else {
(
config.topics.clone(),
None,
tokio_util::sync::CancellationToken::new(),
)
};
let subscribed_topics = effective_topics;
if !subscribed_topics.is_empty() {
let topics: Vec<&str> = subscribed_topics.iter().map(String::as_str).collect();
consumer
.subscribe(&topics)
.map_err(|e| TransportError::Connection(format!("Failed to subscribe: {e}")))?;
}
let mut topic_cache = HashMap::with_capacity(subscribed_topics.len());
for topic in &subscribed_topics {
topic_cache.insert(topic.clone(), Arc::from(topic.as_str()));
}
let producer: FutureProducer<StatsContext> = client_config
.create_with_context(StatsContext::new())
.map_err(|e| TransportError::Connection(format!("Failed to create producer: {e}")))?;
let healthy = Arc::new(AtomicBool::new(true));
let filter_engine = super::filter::TransportFilterEngine::new(
&config.filters_in,
&config.filters_out,
&crate::transport::filter::TransportFilterTierConfig::from_cascade(),
)?;
#[cfg(feature = "health")]
{
let h = Arc::clone(&healthy);
crate::health::HealthRegistry::register("transport:kafka", move || {
if h.load(Ordering::Relaxed) {
crate::health::HealthStatus::Healthy
} else {
crate::health::HealthStatus::Unhealthy
}
});
}
Ok(Self {
consumer,
producer,
topic_cache: parking_lot::RwLock::new(topic_cache),
closed: AtomicBool::new(false),
healthy,
subscribed_topics: parking_lot::RwLock::new(subscribed_topics),
shutdown_token,
topic_refresh,
filter_engine,
#[cfg(feature = "governor")]
inbound_gate: None,
#[cfg(feature = "governor")]
partition_limited_warn: PartitionLimitedDiagnostic::default(),
})
}
#[cfg(feature = "governor")]
#[must_use]
pub fn with_inbound_gate(mut self, gate: crate::governor::InboundGate) -> Self {
self.inbound_gate = Some(gate);
self
}
#[cfg(feature = "governor")]
#[must_use]
pub fn has_inbound_gate(&self) -> bool {
self.inbound_gate.is_some()
}
#[cfg(feature = "governor")]
#[must_use]
pub fn gate_actuator(&self) -> Box<dyn crate::governor::GateActuator> {
Box::new(KafkaGateActuator {
consumer: Arc::clone(&self.consumer),
})
}
#[must_use]
pub fn stats(&self) -> KafkaMetrics {
self.consumer.context().get_metrics()
}
#[cfg(feature = "governor")]
pub fn check_partition_limited(&self) -> TransportResult<bool> {
let metrics = self.consumer.context().get_metrics();
let lag = u64::try_from(total_consumer_lag(&metrics).max(0)).unwrap_or(0);
let partitions = self.consumer.assignment().map_or(0, |tpl| tpl.count());
let members = self
.consumer
.fetch_group_list(None, Duration::from_secs(5))
.ok()
.and_then(|list| list.groups().iter().map(|g| g.members().len()).max())
.unwrap_or(1);
let limited = partition_limited(members, partitions, lag);
#[cfg(feature = "metrics")]
::metrics::gauge!("kafka_partition_limited").set(if limited { 1.0 } else { 0.0 });
#[cfg(feature = "health")]
if limited {
crate::health::HealthRegistry::register("kafka:partition_limited", || {
crate::health::HealthStatus::Degraded
});
}
if limited
&& self
.partition_limited_warn
.should_warn_at(std::time::Instant::now())
{
tracing::warn!(
members,
partitions,
lag,
"kafka consumer group is partition-limited: members >= partitions \
with persistent lag -- extra consumers sit idle; the topic needs \
more partitions (diagnostic only, no topology change made)"
);
}
Ok(limited)
}
#[cfg(feature = "governor")]
pub fn spawn_partition_limited_tick(
self: Arc<Self>,
interval: Duration,
shutdown: tokio_util::sync::CancellationToken,
) {
tokio::spawn(async move {
let mut tick = tokio::time::interval(interval);
tick.tick().await; loop {
tokio::select! {
() = shutdown.cancelled() => break,
_ = tick.tick() => {
if let Err(e) = self.check_partition_limited() {
tracing::debug!(error = %e, "partition-limited diagnostic tick failed");
}
}
}
}
});
}
}
impl TransportBase for KafkaTransport {
async fn close(&self) -> TransportResult<()> {
self.closed.store(true, Ordering::Relaxed);
self.healthy.store(false, Ordering::Relaxed);
self.shutdown_token.cancel();
Ok(())
}
fn is_healthy(&self) -> bool {
self.healthy.load(Ordering::Relaxed)
}
fn name(&self) -> &'static str {
"kafka"
}
}
impl TransportSender for KafkaTransport {
async fn send(&self, key: &str, payload: bytes::Bytes) -> SendResult {
if self.closed.load(Ordering::Relaxed) {
return SendResult::Fatal(TransportError::Closed);
}
if self.filter_engine.has_outbound_filters() {
match self.filter_engine.apply_outbound(&payload) {
super::filter::FilterDisposition::Pass => {}
super::filter::FilterDisposition::Drop => return SendResult::Ok,
super::filter::FilterDisposition::Dlq => return SendResult::FilteredDlq,
}
}
let record: FutureRecord<'_, str, [u8]> = FutureRecord::to(key).payload(payload.as_ref());
#[cfg(feature = "transport-trace")]
let record = if let Some(tp) = super::propagation::current_traceparent() {
let headers = rdkafka::message::OwnedHeaders::new().insert(rdkafka::message::Header {
key: super::propagation::TRACEPARENT_HEADER,
value: Some(tp.as_str()),
});
record.headers(headers)
} else {
record
};
#[cfg(feature = "metrics")]
let start = std::time::Instant::now();
let result = match self
.producer
.send(record, Timeout::After(Duration::from_secs(5)))
.await
{
Ok(_) => {
#[cfg(feature = "metrics")]
::metrics::counter!("dfe_transport_sent_total", "transport" => "kafka")
.increment(1);
SendResult::Ok
}
Err((err, _)) => {
let err_str = err.to_string();
if err_str.contains("queue full") || err_str.contains("Local: Queue full") {
#[cfg(feature = "metrics")]
::metrics::counter!(
"dfe_transport_backpressured_total",
"transport" => "kafka"
)
.increment(1);
SendResult::Backpressured
} else {
#[cfg(feature = "metrics")]
::metrics::counter!(
"dfe_transport_send_errors_total",
"transport" => "kafka"
)
.increment(1);
SendResult::Fatal(TransportError::Send(err_str))
}
}
};
#[cfg(feature = "metrics")]
::metrics::histogram!(
"dfe_transport_send_duration_seconds",
"transport" => "kafka"
)
.record(start.elapsed().as_secs_f64());
result
}
}
impl TransportReceiver for KafkaTransport {
type Token = KafkaToken;
async fn recv(&self, max: usize) -> TransportResult<WorkBatch<Self::Token>> {
self.recv_inner(max, None).await
}
async fn recv_limited(
&self,
limits: super::traits::RecvLimits,
) -> TransportResult<WorkBatch<Self::Token>> {
self.recv_inner(limits.max_records, Some(limits.max_bytes))
.await
}
async fn commit(&self, tokens: &[Self::Token]) -> TransportResult<()> {
if tokens.is_empty() {
return Ok(());
}
let tpl = build_commit_tpl(tokens)?;
self.consumer
.commit(&tpl, CommitMode::Sync)
.map_err(|e| TransportError::Commit(e.to_string()))?;
Ok(())
}
}
impl KafkaTransport {
#[allow(clippy::too_many_lines)]
async fn recv_inner(
&self,
max_msgs: usize,
max_bytes: Option<u64>,
) -> TransportResult<WorkBatch<KafkaToken>> {
if self.closed.load(Ordering::Relaxed) {
return Err(TransportError::Closed);
}
#[cfg(feature = "governor")]
if let Some(ref gate) = self.inbound_gate {
let _ = gate.evaluate();
}
if let Some(ref refresh) = self.topic_refresh
&& let Some(new_topics) = refresh.lock().check_changed()
{
let topics: Vec<&str> = new_topics.iter().map(String::as_str).collect();
match self.consumer.subscribe(&topics) {
Ok(()) => {
tracing::info!(?new_topics, "Re-subscribed after topic refresh");
*self.subscribed_topics.write() = new_topics;
}
Err(e) => {
tracing::warn!(error = %e, "Failed to re-subscribe after topic refresh");
}
}
}
let timeout = Duration::from_millis(tuning::POLL_TIMEOUT_MS);
#[cfg(feature = "metrics")]
let poll_start = std::time::Instant::now();
let span_cap = max_msgs.min(tuning::INITIAL_BATCH_CAPACITY);
let mut spans: Vec<Span> = Vec::with_capacity(span_cap);
let arena_hint = match max_bytes {
Some(cap) => usize::try_from(cap)
.unwrap_or(usize::MAX)
.saturating_add(256),
None => span_cap.saturating_mul(256),
};
let mut arena: Vec<u8> = Vec::with_capacity(arena_hint);
let drain_deadline =
std::time::Instant::now() + Duration::from_millis(tuning::MAX_DRAIN_MS);
if let Some(result) = self.consumer.poll(timeout) {
match result {
Ok(msg) => {
#[cfg(feature = "transport-trace")]
if let Some(headers) = msg.headers() {
use rdkafka::message::Headers;
for idx in 0..headers.count() {
if let Some(Ok(header)) = headers.try_get_as::<[u8]>(idx)
&& header.key == super::propagation::TRACEPARENT_HEADER
{
if let Some(value) = header.value
&& let Ok(tp) = std::str::from_utf8(value)
&& super::propagation::is_valid_traceparent(tp)
{
tracing::Span::current().record("traceparent", tp);
}
break;
}
}
}
let topic_str = msg.topic();
let topic: Arc<str> = get_or_insert_topic(&self.topic_cache, topic_str);
let start = arena.len();
arena.extend_from_slice(msg.payload().unwrap_or(&[]));
let end = arena.len();
let partition = msg.partition();
let offset = msg.offset();
let timestamp_ms = msg.timestamp().to_millis();
spans.push(Span {
key: Some(topic.clone()),
token: KafkaToken::new(topic, partition, offset),
timestamp_ms,
format: PayloadFormat::Auto,
range: start..end,
});
}
Err(e) => {
return Err(TransportError::Recv(e.to_string()));
}
}
} else {
#[cfg(feature = "metrics")]
::metrics::histogram!("kafka_poll_duration_seconds")
.record(poll_start.elapsed().as_secs_f64());
return Ok(RecvBatch::from_messages(build_batch_from_spans(
bytes::Bytes::new(),
spans,
))
.into());
}
while spans.len() < max_msgs {
if std::time::Instant::now() >= drain_deadline {
break;
}
if arena_byte_limit_reached(arena.len(), spans.len(), max_bytes) {
break;
}
match self.consumer.poll(Duration::ZERO) {
Some(Ok(msg)) => {
let topic_str = msg.topic();
let topic: Arc<str> = get_or_insert_topic(&self.topic_cache, topic_str);
let start = arena.len();
arena.extend_from_slice(msg.payload().unwrap_or(&[]));
let end = arena.len();
let partition = msg.partition();
let offset = msg.offset();
let timestamp_ms = msg.timestamp().to_millis();
spans.push(Span {
key: Some(topic.clone()),
token: KafkaToken::new(topic, partition, offset),
timestamp_ms,
format: PayloadFormat::Auto,
range: start..end,
});
}
Some(Err(e)) => {
if spans.is_empty() {
return Err(TransportError::Recv(e.to_string()));
}
break;
}
None => break,
}
}
let arena: bytes::Bytes = bytes::Bytes::from(arena);
let messages = build_batch_from_spans(arena, spans);
let batch =
self.filter_engine
.partition_batch(messages, |m| m.payload.as_ref(), |m| m.key.clone());
let messages = batch.messages;
let dlq_entries = batch.dlq_entries;
Ok(RecvBatch {
messages,
dlq_entries,
}
.into())
}
pub fn commit_weak_async(&self, tokens: &[KafkaToken]) -> TransportResult<()> {
if tokens.is_empty() {
return Ok(());
}
let tpl = build_commit_tpl(tokens)?;
self.consumer
.commit(&tpl, CommitMode::Async)
.map_err(|e| TransportError::Commit(e.to_string()))?;
Ok(())
}
}
fn highest_offsets_per_partition(tokens: &[KafkaToken]) -> HashMap<(Arc<str>, i32), i64> {
let mut partition_offsets: HashMap<(Arc<str>, i32), i64> =
HashMap::with_capacity(tokens.len().min(1024));
for token in tokens {
let key = (Arc::clone(&token.topic), token.partition);
partition_offsets
.entry(key)
.and_modify(|current| {
if token.offset > *current {
*current = token.offset;
}
})
.or_insert(token.offset);
}
partition_offsets
}
fn build_commit_tpl(tokens: &[KafkaToken]) -> TransportResult<TopicPartitionList> {
let mut tpl = TopicPartitionList::new();
for ((topic, partition), offset) in highest_offsets_per_partition(tokens) {
tpl.add_partition_offset(topic.as_ref(), partition, Offset::Offset(offset + 1))
.map_err(|e| TransportError::Commit(format!("Failed to build TPL: {e}")))?;
}
Ok(tpl)
}
fn arena_byte_limit_reached(arena_len: usize, span_count: usize, max_bytes: Option<u64>) -> bool {
match max_bytes {
Some(cap) => span_count > 0 && arena_len as u64 >= cap,
None => false,
}
}
struct Span {
key: Option<Arc<str>>,
token: KafkaToken,
timestamp_ms: Option<i64>,
format: PayloadFormat,
range: core::ops::Range<usize>,
}
fn build_batch_from_spans(arena: bytes::Bytes, spans: Vec<Span>) -> Vec<Message<KafkaToken>> {
spans
.into_iter()
.map(|span| Message {
key: span.key,
payload: arena.slice(span.range),
token: span.token,
timestamp_ms: span.timestamp_ms,
format: span.format,
})
.collect()
}
#[inline]
fn get_or_insert_topic(
cache: &parking_lot::RwLock<HashMap<String, Arc<str>>>,
topic: &str,
) -> Arc<str> {
if let Some(arc) = cache.read().get(topic) {
return arc.clone();
}
let arc: Arc<str> = Arc::from(topic);
cache.write().insert(topic.to_string(), arc.clone());
arc
}
#[cfg(feature = "governor")]
struct KafkaGateActuator {
consumer: Arc<BaseConsumer<StatsContext>>,
}
#[cfg(feature = "governor")]
impl crate::governor::GateActuator for KafkaGateActuator {
fn pause(&self) {
match self.consumer.assignment() {
Ok(tpl) => {
if let Err(e) = self.consumer.pause(&tpl) {
tracing::warn!(error = %e, "kafka gate: pause(assignment) failed");
}
}
Err(e) => {
tracing::warn!(error = %e, "kafka gate: assignment() failed on pause");
}
}
}
fn resume(&self) {
match self.consumer.assignment() {
Ok(tpl) => {
if let Err(e) = self.consumer.resume(&tpl) {
tracing::warn!(error = %e, "kafka gate: resume(assignment) failed");
}
}
Err(e) => {
tracing::warn!(error = %e, "kafka gate: assignment() failed on resume");
}
}
}
}
#[cfg(feature = "governor")]
#[must_use]
pub fn partition_limited(members: usize, partitions: usize, lag: u64) -> bool {
partitions > 0 && members >= partitions && lag > 0
}
#[cfg(feature = "governor")]
struct PartitionLimitedDiagnostic {
last_warn: parking_lot::Mutex<Option<std::time::Instant>>,
cooldown: Duration,
}
#[cfg(feature = "governor")]
impl Default for PartitionLimitedDiagnostic {
fn default() -> Self {
Self {
last_warn: parking_lot::Mutex::new(None),
#[allow(clippy::duration_suboptimal_units)]
cooldown: Duration::from_secs(300),
}
}
}
#[cfg(feature = "governor")]
impl PartitionLimitedDiagnostic {
fn should_warn_at(&self, now: std::time::Instant) -> bool {
let mut last = self.last_warn.lock();
match *last {
Some(prev) if now.duration_since(prev) < self.cooldown => false,
_ => {
*last = Some(now);
true
}
}
}
}
impl std::fmt::Debug for KafkaTransport {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("KafkaTransport")
.field("subscribed_topics", &*self.subscribed_topics.read())
.field("closed", &self.closed.load(Ordering::Relaxed))
.field("healthy", &self.healthy.load(Ordering::Relaxed))
.field("topic_refresh_active", &self.topic_refresh.is_some())
.finish_non_exhaustive()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tuning_constants() {
assert_eq!(tuning::DEFAULT_BATCH_SIZE, 10_000);
assert_eq!(tuning::MAX_DRAIN_MS, 100);
assert_eq!(tuning::POLL_TIMEOUT_MS, 50);
}
#[test]
fn test_get_or_insert_topic_cached() {
let mut map = HashMap::new();
map.insert("events".to_string(), Arc::from("events"));
let cache = parking_lot::RwLock::new(map);
let arc1 = get_or_insert_topic(&cache, "events");
let arc2 = get_or_insert_topic(&cache, "events");
assert!(Arc::ptr_eq(&arc1, &arc2));
}
#[test]
fn test_get_or_insert_topic_new() {
let cache = parking_lot::RwLock::new(HashMap::new());
let arc = get_or_insert_topic(&cache, "new-topic");
assert_eq!(&*arc, "new-topic");
assert!(cache.read().contains_key("new-topic"));
let arc2 = get_or_insert_topic(&cache, "new-topic");
assert!(Arc::ptr_eq(&arc, &arc2));
}
#[test]
fn test_kafka_config_defaults() {
let config = KafkaConfig::default();
assert_eq!(config.fetch_max_bytes, 52_428_800); assert!(!config.enable_auto_commit); }
#[tokio::test]
async fn test_topic_refresh_check_changed_detects_updates() {
let (tx, rx) = tokio::sync::watch::channel(vec!["events_load".to_string()]);
let mut handle = topic_resolver::TopicRefreshHandle::new_for_test(rx);
assert!(handle.check_changed().is_none());
tx.send(vec!["events_load".to_string(), "logs_load".to_string()])
.unwrap();
let changed = handle.check_changed();
assert!(changed.is_some());
let topics = changed.unwrap();
assert_eq!(topics.len(), 2);
assert!(topics.contains(&"logs_load".to_string()));
assert!(handle.check_changed().is_none());
}
fn assert_within(slice: &bytes::Bytes, blob: &bytes::Bytes) {
let blob_start = blob.as_ptr() as usize;
let blob_end = blob_start + blob.len();
let slice_start = slice.as_ptr() as usize;
let slice_end = slice_start + slice.len();
assert!(
slice_start >= blob_start && slice_end <= blob_end,
"slice [{slice_start:#x}, {slice_end:#x}) is not within arena \
[{blob_start:#x}, {blob_end:#x}) -- it is a copy, not a view"
);
}
fn arena_with(payloads: &[&[u8]]) -> (bytes::Bytes, Vec<Span>) {
let mut arena: Vec<u8> = Vec::new();
let mut spans: Vec<Span> = Vec::new();
for (i, p) in payloads.iter().enumerate() {
let start = arena.len();
arena.extend_from_slice(p);
let end = arena.len();
let offset = i64::try_from(i).expect("test index fits i64");
spans.push(Span {
key: Some(Arc::from("events")),
token: KafkaToken::new(Arc::from("events"), 0, offset),
timestamp_ms: Some(1_000 + offset),
format: PayloadFormat::Auto,
range: start..end,
});
}
(bytes::Bytes::from(arena), spans)
}
#[test]
fn build_batch_payloads_match_and_in_order() {
let payloads: &[&[u8]] = &[b"{\"a\":1}", b"hello world", b"[1,2,3]"];
let (arena, spans) = arena_with(payloads);
let msgs = build_batch_from_spans(arena, spans);
assert_eq!(msgs.len(), 3);
for (i, expected) in payloads.iter().enumerate() {
assert_eq!(msgs[i].payload.as_ref(), *expected, "payload {i} mismatch");
let offset = i64::try_from(i).expect("test index fits i64");
assert_eq!(msgs[i].token.offset, offset);
assert_eq!(msgs[i].timestamp_ms, Some(1_000 + offset));
}
}
#[test]
fn build_batch_payloads_are_views_into_shared_arena() {
let payloads: &[&[u8]] = &[b"first-record", b"second", b"third-payload-xyz"];
let (arena, spans) = arena_with(payloads);
let arena_ref = arena.clone();
let msgs = build_batch_from_spans(arena, spans);
for m in &msgs {
assert_within(&m.payload, &arena_ref);
}
let base = arena_ref.as_ptr() as usize;
for m in &msgs {
let off = m.payload.as_ptr() as usize - base;
assert!(off < arena_ref.len() || m.payload.is_empty());
}
}
#[test]
fn build_batch_empty_payload_span_yields_empty_slice() {
let payloads: &[&[u8]] = &[b"before", b"", b"after"];
let (arena, spans) = arena_with(payloads);
let msgs = build_batch_from_spans(arena, spans);
assert_eq!(msgs.len(), 3);
assert_eq!(msgs[0].payload.as_ref(), b"before");
assert!(
msgs[1].payload.is_empty(),
"empty span must yield empty slice"
);
assert_eq!(msgs[2].payload.as_ref(), b"after");
}
#[test]
fn build_batch_no_spans_yields_empty_batch() {
let msgs = build_batch_from_spans(bytes::Bytes::new(), Vec::new());
assert!(msgs.is_empty());
}
#[test]
fn build_batch_preserves_key_token_format() {
let (arena, mut spans) = arena_with(&[b"{\"k\":1}"]);
spans[0].format = PayloadFormat::Json;
let msgs = build_batch_from_spans(arena, spans);
assert_eq!(msgs[0].key.as_deref(), Some("events"));
assert_eq!(msgs[0].token.topic.as_ref(), "events");
assert_eq!(msgs[0].format, PayloadFormat::Json);
}
#[test]
fn arena_stop_record_bounded_when_no_byte_cap() {
assert!(!arena_byte_limit_reached(10_000_000, 5, None));
assert!(!arena_byte_limit_reached(0, 0, None));
}
#[test]
fn arena_stop_floors_at_one_record() {
assert!(
!arena_byte_limit_reached(50_000, 0, Some(1024)),
"floor: must take at least one record before a byte cap can stop"
);
}
#[test]
fn arena_stop_when_cap_reached_with_records() {
assert!(
arena_byte_limit_reached(1024, 1, Some(1024)),
"arena_len == cap with a record present -> stop"
);
assert!(
arena_byte_limit_reached(2048, 3, Some(1024)),
"arena_len > cap with records present -> stop"
);
assert!(
!arena_byte_limit_reached(512, 2, Some(1024)),
"arena_len < cap -> keep draining toward the budget"
);
}
#[test]
fn highest_offsets_picks_max_per_partition() {
let topic: Arc<str> = Arc::from("events");
let tokens = vec![
KafkaToken::new(Arc::clone(&topic), 0, 5),
KafkaToken::new(Arc::clone(&topic), 1, 3),
KafkaToken::new(Arc::clone(&topic), 0, 2),
KafkaToken::new(Arc::clone(&topic), 0, 9),
KafkaToken::new(Arc::clone(&topic), 1, 1),
KafkaToken::new(Arc::clone(&topic), 0, 7),
];
let map = highest_offsets_per_partition(&tokens);
assert_eq!(map.len(), 2, "two partitions");
assert_eq!(
map.get(&(Arc::clone(&topic), 0)),
Some(&9),
"partition 0 keeps the highest offset 9, not the last-seen 7"
);
assert_eq!(
map.get(&(Arc::clone(&topic), 1)),
Some(&3),
"partition 1 keeps the highest offset 3"
);
}
#[test]
fn highest_offsets_separates_distinct_topics() {
let a: Arc<str> = Arc::from("topic-a");
let b: Arc<str> = Arc::from("topic-b");
let tokens = vec![
KafkaToken::new(Arc::clone(&a), 0, 10),
KafkaToken::new(Arc::clone(&b), 0, 4),
KafkaToken::new(Arc::clone(&a), 0, 11),
];
let map = highest_offsets_per_partition(&tokens);
assert_eq!(map.len(), 2, "two (topic, partition) keys");
assert_eq!(map.get(&(a, 0)), Some(&11));
assert_eq!(map.get(&(b, 0)), Some(&4));
}
#[test]
fn highest_offsets_empty_tokens_yield_empty_map() {
let map = highest_offsets_per_partition(&[]);
assert!(map.is_empty());
}
#[test]
fn build_commit_tpl_stores_highest_plus_one() {
let topic: Arc<str> = Arc::from("events");
let tokens = vec![
KafkaToken::new(Arc::clone(&topic), 0, 5),
KafkaToken::new(Arc::clone(&topic), 0, 9),
KafkaToken::new(Arc::clone(&topic), 1, 3),
];
let tpl = build_commit_tpl(&tokens).expect("valid tpl");
let e0 = tpl
.find_partition("events", 0)
.expect("partition 0 present");
assert_eq!(
e0.offset(),
Offset::Offset(10),
"p0 commits highest(9) + 1 = 10 (next-to-read)"
);
let e1 = tpl
.find_partition("events", 1)
.expect("partition 1 present");
assert_eq!(
e1.offset(),
Offset::Offset(4),
"p1 commits highest(3) + 1 = 4 (next-to-read)"
);
}
#[cfg(feature = "governor")]
#[test]
fn partition_limited_truth_table() {
assert!(
partition_limited(4, 4, 10),
"equal members+partitions, lag>0"
);
assert!(partition_limited(6, 4, 1), "more members than partitions");
assert!(
!partition_limited(2, 4, 10),
"fewer members -> can scale out"
);
assert!(!partition_limited(8, 4, 0), "no lag -> not limited");
assert!(
!partition_limited(4, 0, 10),
"no partition info -> not limited"
);
assert!(!partition_limited(0, 0, 0), "all zero -> not limited");
}
#[cfg(feature = "governor")]
#[test]
fn partition_limited_warn_dedups_within_window() {
use std::time::{Duration, Instant};
let diag = PartitionLimitedDiagnostic {
last_warn: parking_lot::Mutex::new(None),
#[allow(clippy::duration_suboptimal_units)]
cooldown: Duration::from_secs(300),
};
let t0 = Instant::now();
assert!(diag.should_warn_at(t0), "first warning fires");
assert!(
!diag.should_warn_at(t0 + Duration::from_secs(10)),
"second warning within cooldown is suppressed"
);
assert!(
!diag.should_warn_at(t0 + Duration::from_secs(299)),
"still within cooldown -> suppressed"
);
assert!(
diag.should_warn_at(t0 + Duration::from_secs(301)),
"after cooldown the warning re-fires once"
);
assert!(
!diag.should_warn_at(t0 + Duration::from_secs(305)),
"new window re-armed; immediate repeat suppressed"
);
}
#[cfg(feature = "governor")]
#[test]
fn inbound_gate_edge_wiring_drives_actuator_once_per_edge() {
use crate::governor::{Admit, GateActuator, Hysteresis, InboundGate, UnifiedPressure};
use crate::governor::{Pressure, PressureSource};
use std::sync::atomic::{AtomicU64, AtomicUsize};
struct MockSource(AtomicU64);
impl PressureSource for MockSource {
fn name(&self) -> &'static str {
"mock"
}
fn sample(&self) -> Pressure {
Pressure::new(f64::from_bits(self.0.load(Ordering::Relaxed)))
}
fn is_hard(&self) -> bool {
true
}
}
struct Counter {
pauses: AtomicUsize,
resumes: AtomicUsize,
}
struct Forward(Arc<Counter>);
impl GateActuator for Forward {
fn pause(&self) {
self.0.pauses.fetch_add(1, Ordering::Relaxed);
}
fn resume(&self) {
self.0.resumes.fetch_add(1, Ordering::Relaxed);
}
}
let src = Arc::new(MockSource(AtomicU64::new(0.1_f64.to_bits())));
let pressure = Arc::new(UnifiedPressure::new(
vec![Arc::clone(&src) as Arc<dyn PressureSource>],
Hysteresis::new(0.80, 0.65).expect("valid band"),
));
let counter = Arc::new(Counter {
pauses: AtomicUsize::new(0),
resumes: AtomicUsize::new(0),
});
let gate = InboundGate::new(
Arc::clone(&pressure),
Box::new(Forward(Arc::clone(&counter))),
);
assert_eq!(gate.evaluate(), Admit::Yes);
assert_eq!(counter.pauses.load(Ordering::Relaxed), 0);
src.0.store(0.95_f64.to_bits(), Ordering::Relaxed);
assert_eq!(gate.evaluate(), Admit::Hold);
assert_eq!(gate.evaluate(), Admit::Hold);
assert_eq!(
counter.pauses.load(Ordering::Relaxed),
1,
"pause once per edge"
);
src.0.store(0.10_f64.to_bits(), Ordering::Relaxed);
assert_eq!(gate.evaluate(), Admit::Yes);
assert_eq!(
counter.resumes.load(Ordering::Relaxed),
1,
"resume once per edge"
);
}
#[test]
fn test_subscribed_topics_rwlock_update() {
let topics = parking_lot::RwLock::new(vec!["events_load".to_string()]);
assert_eq!(topics.read().len(), 1);
*topics.write() = vec!["events_load".to_string(), "logs_load".to_string()];
assert_eq!(topics.read().len(), 2);
assert_eq!(topics.read()[1], "logs_load");
}
}