use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use bytes::Bytes;
use rand::Rng;
use tokio::sync::watch;
use tracing::{debug, info, warn};
use crate::error::ErrorCode;
use crate::metrics::{KrafkaMetrics, MetricsExporter};
use crate::network::BrokerConnection;
use crate::protocol::{
ApiKey, GetTelemetrySubscriptionsRequest, GetTelemetrySubscriptionsResponse,
PushTelemetryRequest, PushTelemetryResponse, VersionedDecode,
};
use super::otlp::OtlpExporter;
const MAX_RETRIES: u32 = 3;
const RETRY_BACKOFF_BASE: Duration = Duration::from_secs(1);
const MIN_PUSH_INTERVAL_MS: i32 = 100;
const MAX_PUSH_INTERVAL_MS: i32 = 60 * 60 * 1000;
fn retry_backoff(attempt: u32) -> Duration {
debug_assert!(attempt > 0, "retry_backoff expects attempts starting at 1");
RETRY_BACKOFF_BASE * 2u32.saturating_pow(attempt.saturating_sub(1))
}
fn clamp_push_interval_ms(raw_ms: i32) -> i32 {
raw_ms.clamp(MIN_PUSH_INTERVAL_MS, MAX_PUSH_INTERVAL_MS)
}
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct TelemetryConfig {
pub enabled: bool,
pub metrics_prefix: String,
pub resource_attributes: Vec<(String, String)>,
}
impl Default for TelemetryConfig {
fn default() -> Self {
Self {
enabled: true,
metrics_prefix: "org.apache.kafka".to_string(),
resource_attributes: Vec::new(),
}
}
}
#[derive(Debug, Clone)]
struct Subscription {
client_instance_id: [u8; 16],
subscription_id: i32,
push_interval: Duration,
delta_temporality: bool,
telemetry_max_bytes: i32,
requested_metrics: Vec<String>,
}
impl Subscription {
fn has_metrics(&self) -> bool {
!self.requested_metrics.is_empty()
}
fn wants_all_metrics(&self) -> bool {
self.requested_metrics.len() == 1 && self.requested_metrics[0] == "*"
}
}
pub struct TelemetryReporter {
connection: Arc<BrokerConnection>,
metrics: Arc<KrafkaMetrics>,
config: TelemetryConfig,
shutdown: watch::Receiver<bool>,
delta_tracker: DeltaTracker,
last_delta_temporality: bool,
}
impl TelemetryReporter {
pub fn new(
connection: Arc<BrokerConnection>,
metrics: Arc<KrafkaMetrics>,
config: TelemetryConfig,
shutdown: watch::Receiver<bool>,
) -> Self {
Self {
connection,
metrics,
config,
shutdown,
delta_tracker: DeltaTracker::new(),
last_delta_temporality: false,
}
}
pub async fn run(mut self) {
if !self.config.enabled {
debug!("Telemetry push disabled by configuration");
return;
}
info!("KIP-714 telemetry reporter starting");
let mut subscription = match self.get_subscription_with_retry([0u8; 16]).await {
Some(s) => s,
None => {
warn!("Failed to obtain telemetry subscription after retries; reporter exiting");
return;
}
};
info!(
client_instance_id = ?subscription.client_instance_id,
push_interval_ms = subscription.push_interval.as_millis(),
requested_metrics = ?subscription.requested_metrics,
"Telemetry subscription acquired"
);
let jitter_factor: f64 = rand::rng().random_range(0.5..1.5);
let first_delay = subscription.push_interval.mul_f64(jitter_factor);
let collection_start = Self::nanos_since_epoch();
if self.wait_or_shutdown(first_delay).await {
self.send_terminating_push(&subscription, collection_start)
.await;
return;
}
let mut window_start = collection_start;
loop {
if subscription.has_metrics() {
match self.push_metrics(&subscription, window_start, false).await {
PushResult::Ok => {}
PushResult::ReSubscribe => {
debug!("Subscription invalidated; re-subscribing");
match self
.get_subscription_with_retry(subscription.client_instance_id)
.await
{
Some(s) => {
self.delta_tracker.reset();
subscription = s;
}
None => {
warn!("Re-subscription failed after retries; reporter exiting");
return;
}
}
}
PushResult::Transient => {
}
PushResult::Fatal => {
warn!("Fatal telemetry push error; reporter exiting");
return;
}
}
}
window_start = Self::nanos_since_epoch();
if self.wait_or_shutdown(subscription.push_interval).await {
self.send_terminating_push(&subscription, window_start)
.await;
return;
}
if !subscription.has_metrics() {
debug!("No metrics subscribed; re-checking subscription");
match self
.get_subscription_with_retry(subscription.client_instance_id)
.await
{
Some(s) => {
self.delta_tracker.reset();
subscription = s;
}
None => {
warn!("Re-subscription failed after retries; reporter exiting");
return;
}
}
}
}
}
async fn get_subscription_with_retry(
&mut self,
client_instance_id: [u8; 16],
) -> Option<Subscription> {
for attempt in 0..=MAX_RETRIES {
if attempt > 0 {
let backoff = retry_backoff(attempt);
debug!(
attempt,
backoff_ms = backoff.as_millis(),
"Retrying GetTelemetrySubscriptions"
);
if self.wait_or_shutdown(backoff).await {
return None; }
}
match self.get_subscription(client_instance_id).await {
SubscriptionResult::Ok(sub) => return Some(sub),
SubscriptionResult::Transient => continue,
SubscriptionResult::Fatal => return None,
}
}
None
}
async fn get_subscription(&self, client_instance_id: [u8; 16]) -> SubscriptionResult {
let req = GetTelemetrySubscriptionsRequest { client_instance_id };
let response_bytes: Bytes = match self
.connection
.send_request(ApiKey::GetTelemetrySubscriptions, 0, |buf| {
req.encode_v0(buf)
})
.await
{
Ok(b) => b,
Err(e) => {
warn!(error = %e, "GetTelemetrySubscriptions request failed");
return SubscriptionResult::Transient;
}
};
let resp = match GetTelemetrySubscriptionsResponse::decode_versioned(
0,
&mut response_bytes.as_ref(),
) {
Ok(r) => r,
Err(e) => {
warn!(error = %e, "Failed to decode GetTelemetrySubscriptionsResponse");
return SubscriptionResult::Transient;
}
};
if resp.throttle_time_ms > 0 {
debug!(throttle_ms = resp.throttle_time_ms, "Throttled by broker");
}
if resp.error_code != ErrorCode::None {
warn!(
error_code = ?resp.error_code,
"GetTelemetrySubscriptions returned error"
);
return if resp.error_code.is_retriable() {
SubscriptionResult::Transient
} else {
SubscriptionResult::Fatal
};
}
let clamped_push_interval_ms = clamp_push_interval_ms(resp.push_interval_ms);
if clamped_push_interval_ms != resp.push_interval_ms {
debug!(
raw_push_interval_ms = resp.push_interval_ms,
clamped_push_interval_ms,
"Clamped broker telemetry push interval to supported bounds"
);
}
let push_interval = Duration::from_millis(clamped_push_interval_ms as u64);
let effective_id = if client_instance_id == [0u8; 16] {
resp.client_instance_id
} else {
client_instance_id
};
SubscriptionResult::Ok(Subscription {
client_instance_id: effective_id,
subscription_id: resp.subscription_id,
push_interval,
delta_temporality: resp.delta_temporality,
telemetry_max_bytes: resp.telemetry_max_bytes,
requested_metrics: resp.requested_metrics,
})
}
async fn push_metrics(
&mut self,
subscription: &Subscription,
window_start_nanos: u64,
terminating: bool,
) -> PushResult {
let mut exporter = OtlpExporter::new(subscription.delta_temporality, window_start_nanos);
for (k, v) in &self.config.resource_attributes {
exporter.add_resource_attribute(k.as_str(), v.as_str());
}
if subscription.delta_temporality != self.last_delta_temporality {
debug!(
old = self.last_delta_temporality,
new = subscription.delta_temporality,
"Delta temporality changed; resetting tracker"
);
self.delta_tracker.reset();
self.last_delta_temporality = subscription.delta_temporality;
}
if subscription.has_metrics() {
if subscription.delta_temporality {
let mut delta_exp = DeltaExporter::new(&mut exporter, &mut self.delta_tracker);
if subscription.wants_all_metrics() {
self.metrics
.export_all_with_prefix(&self.config.metrics_prefix, &mut delta_exp);
} else {
let mut filter =
PrefixFilterExporter::new(&subscription.requested_metrics, &mut delta_exp);
self.metrics
.export_all_with_prefix(&self.config.metrics_prefix, &mut filter);
}
} else if subscription.wants_all_metrics() {
self.metrics
.export_all_with_prefix(&self.config.metrics_prefix, &mut exporter);
} else {
let mut filter =
PrefixFilterExporter::new(&subscription.requested_metrics, &mut exporter);
self.metrics
.export_all_with_prefix(&self.config.metrics_prefix, &mut filter);
}
}
let payload = exporter.finish();
if subscription.telemetry_max_bytes > 0
&& payload.len() > subscription.telemetry_max_bytes as usize
{
warn!(
payload_bytes = payload.len(),
max_bytes = subscription.telemetry_max_bytes,
"Telemetry payload exceeds broker TelemetryMaxBytes; skipping push"
);
return PushResult::Ok;
}
let req = PushTelemetryRequest {
client_instance_id: subscription.client_instance_id,
subscription_id: subscription.subscription_id,
terminating,
compression_type: 0, metrics: Bytes::from(payload),
};
let response_bytes: Bytes = match self
.connection
.send_request(ApiKey::PushTelemetry, 0, |buf| req.encode_v0(buf))
.await
{
Ok(b) => b,
Err(e) => {
warn!(error = %e, "PushTelemetry request failed (transient)");
return PushResult::Transient;
}
};
let resp = match PushTelemetryResponse::decode_versioned(0, &mut response_bytes.as_ref()) {
Ok(r) => r,
Err(e) => {
warn!(error = %e, "Failed to decode PushTelemetryResponse");
return PushResult::Transient;
}
};
if resp.throttle_time_ms > 0 {
debug!(
throttle_ms = resp.throttle_time_ms,
"PushTelemetry throttled"
);
}
match resp.error_code {
ErrorCode::None => {
debug!(
terminating,
payload_bytes = req.metrics.len(),
"PushTelemetry accepted"
);
PushResult::Ok
}
ErrorCode::UnknownSubscriptionId => {
debug!("Broker returned UNKNOWN_SUBSCRIPTION_ID");
PushResult::ReSubscribe
}
ErrorCode::UnsupportedCompressionType => {
debug!("Broker returned UNSUPPORTED_COMPRESSION_TYPE");
PushResult::ReSubscribe
}
ErrorCode::TelemetryTooLarge => {
warn!(
payload_bytes = req.metrics.len(),
"Broker returned TELEMETRY_TOO_LARGE; re-subscribing for updated limits"
);
PushResult::ReSubscribe
}
ErrorCode::InvalidRequest | ErrorCode::InvalidRecord => {
warn!(
error_code = ?resp.error_code,
"PushTelemetry rejected with non-retriable error; stopping"
);
PushResult::Fatal
}
ErrorCode::ThrottlingQuotaExceeded => {
debug!("PushTelemetry throttled; will retry next interval");
PushResult::Ok
}
other => {
warn!(error_code = ?other, "PushTelemetry returned unexpected error");
PushResult::Transient
}
}
}
async fn push_metrics_with_retry(
&mut self,
subscription: &Subscription,
window_start_nanos: u64,
terminating: bool,
) -> PushResult {
for attempt in 0..=MAX_RETRIES {
if attempt > 0 {
let backoff = retry_backoff(attempt);
debug!(
attempt,
backoff_ms = backoff.as_millis(),
terminating,
"Retrying PushTelemetry"
);
if self.wait_or_shutdown(backoff).await {
return PushResult::Transient;
}
}
match self
.push_metrics(subscription, window_start_nanos, terminating)
.await
{
PushResult::Transient if attempt < MAX_RETRIES => continue,
result => return result,
}
}
PushResult::Transient
}
async fn send_terminating_push(&mut self, subscription: &Subscription, window_start: u64) {
if !subscription.has_metrics() {
debug!("No metrics subscribed; skipping terminating push");
return;
}
info!("Sending terminating telemetry push");
if let PushResult::ReSubscribe = self
.push_metrics_with_retry(subscription, window_start, true)
.await
{
debug!("Terminating push returned re-subscribe; attempting one re-subscribe");
if let SubscriptionResult::Ok(new_sub) =
self.get_subscription(subscription.client_instance_id).await
{
let _ = self
.push_metrics_with_retry(&new_sub, window_start, true)
.await;
}
}
}
async fn wait_or_shutdown(&mut self, duration: Duration) -> bool {
tokio::select! {
_ = tokio::time::sleep(duration) => false,
result = self.shutdown.changed() => {
result.is_err() || *self.shutdown.borrow()
}
}
}
fn nanos_since_epoch() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64
}
}
enum SubscriptionResult {
Ok(Subscription),
Transient,
Fatal,
}
enum PushResult {
Ok,
ReSubscribe,
Transient,
Fatal,
}
struct DeltaTracker {
prev: HashMap<String, u64>,
}
impl DeltaTracker {
fn new() -> Self {
Self {
prev: HashMap::new(),
}
}
fn delta(&mut self, name: &str, value: u64) -> u64 {
if let Some(prev_val) = self.prev.get_mut(name) {
let prev = *prev_val;
*prev_val = value;
value.saturating_sub(prev)
} else {
self.prev.insert(name.to_string(), value);
value
}
}
fn reset(&mut self) {
self.prev.clear();
}
}
struct DeltaExporter<'a> {
inner: &'a mut dyn MetricsExporter,
tracker: &'a mut DeltaTracker,
}
impl<'a> DeltaExporter<'a> {
fn new(inner: &'a mut dyn MetricsExporter, tracker: &'a mut DeltaTracker) -> Self {
Self { inner, tracker }
}
}
impl MetricsExporter for DeltaExporter<'_> {
fn export_counter(&mut self, name: &str, help: &str, value: u64) {
let delta = self.tracker.delta(name, value);
self.inner.export_counter(name, help, delta);
}
fn export_gauge(&mut self, name: &str, help: &str, value: u64) {
self.inner.export_gauge(name, help, value);
}
fn export_latency(
&mut self,
name: &str,
help: &str,
snapshot: &crate::metrics::LatencySnapshot,
) {
self.inner.export_latency(name, help, snapshot);
}
}
struct PrefixFilterExporter<'a> {
prefixes: &'a [String],
inner: &'a mut dyn MetricsExporter,
}
impl<'a> PrefixFilterExporter<'a> {
fn new(prefixes: &'a [String], inner: &'a mut dyn MetricsExporter) -> Self {
Self { prefixes, inner }
}
fn matches(&self, name: &str) -> bool {
self.prefixes.iter().any(|p| name.starts_with(p.as_str()))
}
}
impl MetricsExporter for PrefixFilterExporter<'_> {
fn export_counter(&mut self, name: &str, help: &str, value: u64) {
if self.matches(name) {
self.inner.export_counter(name, help, value);
}
}
fn export_gauge(&mut self, name: &str, help: &str, value: u64) {
if self.matches(name) {
self.inner.export_gauge(name, help, value);
}
}
fn export_latency(
&mut self,
name: &str,
help: &str,
snapshot: &crate::metrics::LatencySnapshot,
) {
if self.matches(name) {
self.inner.export_latency(name, help, snapshot);
}
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
use super::*;
#[test]
fn test_telemetry_config_defaults() {
let config = TelemetryConfig::default();
assert!(config.enabled);
assert_eq!(config.metrics_prefix, "org.apache.kafka");
assert!(config.resource_attributes.is_empty());
}
#[test]
fn test_nanos_since_epoch_is_reasonable() {
let nanos = TelemetryReporter::nanos_since_epoch();
let year_2020_nanos = 1_577_836_800_000_000_000u64;
assert!(nanos > year_2020_nanos);
}
#[test]
fn test_subscription_has_metrics() {
let sub = Subscription {
client_instance_id: [0; 16],
subscription_id: 0,
push_interval: Duration::from_secs(300),
delta_temporality: false,
telemetry_max_bytes: 1_048_576,
requested_metrics: vec![],
};
assert!(!sub.has_metrics());
assert!(!sub.wants_all_metrics());
}
#[test]
fn test_subscription_wants_all_metrics() {
let sub = Subscription {
client_instance_id: [0; 16],
subscription_id: 0,
push_interval: Duration::from_secs(300),
delta_temporality: false,
telemetry_max_bytes: 1_048_576,
requested_metrics: vec!["*".to_string()],
};
assert!(sub.has_metrics());
assert!(sub.wants_all_metrics());
}
#[test]
fn test_subscription_prefix_metrics() {
let sub = Subscription {
client_instance_id: [0; 16],
subscription_id: 0,
push_interval: Duration::from_secs(300),
delta_temporality: false,
telemetry_max_bytes: 1_048_576,
requested_metrics: vec!["org.apache.kafka.producer.".to_string()],
};
assert!(sub.has_metrics());
assert!(!sub.wants_all_metrics());
}
#[test]
fn test_prefix_filter_exporter() {
let prefixes = vec![
"org.apache.kafka.producer.".to_string(),
"org.apache.kafka.consumer.lag".to_string(),
];
let mut otlp = OtlpExporter::new(false, 0);
{
let mut filter = PrefixFilterExporter::new(&prefixes, &mut otlp);
filter.export_counter("org.apache.kafka.producer.records_sent", "help", 10);
filter.export_gauge("org.apache.kafka.consumer.lag", "help", 5);
filter.export_latency(
"org.apache.kafka.producer.send_latency",
"help",
&crate::metrics::LatencySnapshot {
count: 1,
sum: Duration::from_millis(50),
min: Some(Duration::from_millis(50)),
max: Some(Duration::from_millis(50)),
avg: Some(Duration::from_millis(50)),
},
);
filter.export_counter("org.apache.kafka.consumer.polls", "help", 99);
filter.export_gauge("org.apache.kafka.connection.active", "help", 3);
filter.export_latency(
"org.apache.kafka.connection.latency",
"help",
&crate::metrics::LatencySnapshot {
count: 1,
sum: Duration::from_millis(10),
min: Some(Duration::from_millis(10)),
max: Some(Duration::from_millis(10)),
avg: Some(Duration::from_millis(10)),
},
);
}
assert_eq!(otlp.finish_metric_count(), 7);
}
#[test]
fn test_subscription_push_interval_clamped_to_supported_bounds() {
let check = |raw: i32, expected_ms: u64| {
let clamped = clamp_push_interval_ms(raw) as u64;
assert_eq!(clamped, expected_ms);
};
check(0, 100);
check(-1, 100);
check(50, 100);
check(100, 100);
check(300_000, 300_000);
check(MAX_PUSH_INTERVAL_MS + 1, MAX_PUSH_INTERVAL_MS as u64);
check(i32::MAX, MAX_PUSH_INTERVAL_MS as u64);
}
#[test]
fn test_retry_backoff_exponential() {
assert_eq!(retry_backoff(1), Duration::from_secs(1));
assert_eq!(retry_backoff(2), Duration::from_secs(2));
assert_eq!(retry_backoff(3), Duration::from_secs(4));
}
#[test]
fn test_subscription_multiple_prefixes() {
let sub = Subscription {
client_instance_id: [0; 16],
subscription_id: 0,
push_interval: Duration::from_secs(300),
delta_temporality: false,
telemetry_max_bytes: 1_048_576,
requested_metrics: vec![
"org.apache.kafka.producer.".to_string(),
"org.apache.kafka.consumer.".to_string(),
],
};
assert!(sub.has_metrics());
assert!(!sub.wants_all_metrics());
}
#[test]
fn test_delta_tracker_computes_deltas() {
let mut tracker = DeltaTracker::new();
assert_eq!(tracker.delta("counter_a", 10), 10);
assert_eq!(tracker.delta("counter_b", 5), 5);
assert_eq!(tracker.delta("counter_a", 25), 15);
assert_eq!(tracker.delta("counter_b", 5), 0);
assert_eq!(tracker.delta("counter_a", 20), 0);
}
#[test]
fn test_delta_tracker_reset() {
let mut tracker = DeltaTracker::new();
tracker.delta("c", 100);
tracker.reset();
assert_eq!(tracker.delta("c", 50), 50);
}
#[test]
fn test_delta_exporter_converts_counters_only() {
let mut otlp = OtlpExporter::new(true, 0);
let mut tracker = DeltaTracker::new();
{
let mut dexp = DeltaExporter::new(&mut otlp, &mut tracker);
dexp.export_counter("c", "help", 100);
dexp.export_gauge("g", "help", 42);
}
assert_eq!(otlp.finish_metric_count(), 2);
let mut otlp2 = OtlpExporter::new(true, 0);
{
let mut dexp = DeltaExporter::new(&mut otlp2, &mut tracker);
dexp.export_counter("c", "help", 130);
dexp.export_gauge("g", "help", 99);
}
assert_eq!(otlp2.finish_metric_count(), 2);
let data1 = otlp.finish();
let data2 = otlp2.finish();
assert_ne!(data1, data2);
}
#[test]
fn test_delta_exporter_with_prefix_filter() {
let prefixes = vec!["prod.".to_string()];
let mut otlp = OtlpExporter::new(true, 0);
let mut tracker = DeltaTracker::new();
{
let mut dexp = DeltaExporter::new(&mut otlp, &mut tracker);
let mut filter = PrefixFilterExporter::new(&prefixes, &mut dexp);
filter.export_counter("prod.sent", "help", 50);
filter.export_counter("cons.recv", "help", 99); }
assert_eq!(otlp.finish_metric_count(), 1);
let mut otlp2 = OtlpExporter::new(true, 0);
{
let mut dexp = DeltaExporter::new(&mut otlp2, &mut tracker);
let mut filter = PrefixFilterExporter::new(&prefixes, &mut dexp);
filter.export_counter("prod.sent", "help", 80);
}
assert_eq!(otlp2.finish_metric_count(), 1);
}
}