krafka 0.7.0

A pure Rust, async-native Apache Kafka client
Documentation
---
layout: default
title: Metrics & Observability
nav_order: 8
description: "Metrics collection and Prometheus export"
---

# Metrics & Observability Guide

This guide covers metrics collection and export in Krafka.

## Overview

Krafka provides built-in metrics collection that is automatically wired into all hot paths:

- **Producer metrics**: Records sent, bytes, batches, errors, retries, send latency — recorded in `send()`, `send_to_partition()`, and batch accumulator flush
- **Consumer metrics**: Records received, polls, fetches, commits, rebalances, assigned partitions, lag, poll latency — recorded in `poll()`, `commit()`, and `close()`
- **Connection metrics**: Connections created/closed, errors, establishment latency, priority scheduling counters, and broker throttle delays

All metrics are lock-free using atomic operations for minimal performance impact. Access metrics via `producer.metrics_handle()`, `consumer.metrics()`, or the connection metric handles exposed by producer, consumer, share-consumer, admin, and connection-pool APIs.

## Pluggable Export

Krafka uses a trait-based export system. Implement `MetricsExporter` to add any
backend. Built-in exporters:

| Exporter | Format | Dependency |
|----------|--------|------------|
| `PrometheusExporter` | Prometheus text exposition | None |
| `JsonExporter` | JSON array of metric objects | None |
| `OtlpExporter` | OTLP MetricsData v1 protobuf | `telemetry` feature |

### Custom Exporter

```rust
use krafka::metrics::{MetricsExporter, LatencySnapshot};

struct StatsDExporter { /* ... */ }

impl MetricsExporter for StatsDExporter {
    fn export_counter(&mut self, name: &str, _help: &str, value: u64) {
        // send_udp(format!("{name}:{value}|c"));
    }
    fn export_gauge(&mut self, name: &str, _help: &str, value: u64) {
        // send_udp(format!("{name}:{value}|g"));
    }
    fn export_latency(&mut self, name: &str, _help: &str, snapshot: &LatencySnapshot) {
        // send_udp(format!("{name}.count:{0}|g", snapshot.count));
    }
}
```

## Basic Usage

Each client type has its own metrics:

```rust
use krafka::metrics::{ProducerMetrics, ConsumerMetrics, ConnectionMetrics, MetricsVisitable};

let producer_metrics = ProducerMetrics::new();
let consumer_metrics = ConsumerMetrics::new();
let connection_metrics = ConnectionMetrics::new();

// Record some metrics
producer_metrics.record_send(100);
producer_metrics.send_latency.record(std::time::Duration::from_millis(5));

// Get a snapshot
let snapshot = producer_metrics.snapshot();
println!("Records sent: {}", snapshot.records_sent);
println!("Bytes sent: {}", snapshot.bytes_sent);
```

## Prometheus Export

```rust
use krafka::metrics::{ProducerMetrics, MetricsVisitable};

let metrics = ProducerMetrics::new();
metrics.record_send(100);
metrics.record_batch(5);

// Export in Prometheus text format (convenience method)
let prometheus_output = metrics.to_prometheus_text("krafka_producer");
println!("{}", prometheus_output);
```

Or use the exporter directly:

```rust
use krafka::metrics::{ProducerMetrics, PrometheusExporter, MetricsVisitable};

let metrics = ProducerMetrics::new();
let mut exporter = PrometheusExporter::new();
metrics.export_metrics("krafka_producer", &mut exporter);
let output = exporter.finish();
```

## JSON Export

```rust
use krafka::metrics::{ProducerMetrics, JsonExporter, MetricsVisitable};

let metrics = ProducerMetrics::new();
metrics.record_send(100);

let mut exporter = JsonExporter::new();
metrics.export_metrics("krafka_producer", &mut exporter);
let json = exporter.finish();
// [{"name":"krafka_producer_records_sent","type":"counter","help":"Total records sent","value":1}, ...]
```

## Aggregated Metrics

Use `KrafkaMetrics` to collect and export all metrics from multiple components:

```rust
use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

let metrics = KrafkaMetrics::new();

// Get shared metrics handles for your clients
let producer_metrics = metrics.producer_metrics();
let consumer_metrics = metrics.consumer_metrics();
let connection_metrics = metrics.connection_metrics();

// Record metrics during operations
producer_metrics.record_send(100);
consumer_metrics.record_poll(5);

// Export all metrics in a single call
let all_metrics = metrics.to_prometheus_text();
println!("{}", all_metrics);

// Export as JSON
let json = metrics.to_json();

// Use a custom exporter
use krafka::metrics::PrometheusExporter;
let mut exporter = PrometheusExporter::new();
metrics.export_all(&mut exporter);
let output = exporter.finish();

// Reset all metrics (e.g., after scrape)
metrics.reset();
```

## HTTP Metrics Endpoint

For production use, expose metrics via HTTP:

```rust
use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

// Create shared metrics registry
let metrics = Arc::new(KrafkaMetrics::new());

// In your HTTP server handler (pseudo-code):
async fn metrics_handler(metrics: Arc<KrafkaMetrics>) -> String {
    metrics.to_prometheus_text()
}
```

Example with Axum:

```rust
use axum::{routing::get, Router, Extension};
use std::sync::Arc;
use krafka::metrics::KrafkaMetrics;

async fn metrics_handler(Extension(metrics): Extension<Arc<KrafkaMetrics>>) -> String {
    metrics.to_prometheus_text()
}

#[tokio::main]
async fn main() {
    let metrics = Arc::new(KrafkaMetrics::new());
    
    let app = Router::new()
        .route("/metrics", get(metrics_handler))
        .layer(Extension(metrics.clone()));
    
    // Use metrics.producer_metrics() etc. with your Kafka clients
}
```

## Available Metrics

### Producer Metrics

| Metric | Type | Description |
|--------|------|-------------|
| `records_sent_total` | Counter | Total records sent successfully |
| `bytes_sent_total` | Counter | Total bytes sent (record values) |
| `batches_sent_total` | Counter | Total batches sent |
| `errors_total` | Counter | Total send errors |
| `retries_total` | Counter | Total retry attempts |
| `connections` | Gauge | Current active connections |
| `buffered_records` | Gauge | Currently buffered records |
| `send_latency_seconds` | Summary | Send latency statistics |

### Consumer Metrics

| Metric | Type | Description |
|--------|------|-------------|
| `records_received_total` | Counter | Total records received |
| `bytes_received_total` | Counter | Total bytes received |
| `fetches_total` | Counter | Total fetch requests |
| `polls_total` | Counter | Total poll operations |
| `empty_polls_total` | Counter | Polls that returned no records |
| `commits_total` | Counter | Total offset commits |
| `errors_total` | Counter | Total errors |
| `rebalances_total` | Counter | Total rebalance operations |
| `lag` | Gauge | Total consumer lag across all assigned partitions |
| `lag_max` | Gauge | Maximum per-partition consumer lag |
| `assigned_partitions` | Gauge | Currently assigned partitions |
| `paused_partitions` | Gauge | Currently paused partitions |
| `buffered_records` | Gauge | Currently buffered records in recv() buffer |
| `poll_latency_seconds` | Summary | Poll latency statistics |
| `fetch_latency_seconds` | Summary | Fetch latency statistics |

### Connection Metrics

| Metric | Type | Description |
|--------|------|-------------|
| `connections_created_total` | Counter | Total connections created |
| `connections_closed_total` | Counter | Total connections closed |
| `connection_errors_total` | Counter | Connection errors |
| `high_priority_requests_total` | Counter | High-priority requests sent |
| `normal_priority_requests_total` | Counter | Normal-priority requests sent |
| `high_priority_bypasses_total` | Counter | High-priority requests processed ahead of normal-priority work |
| `high_priority_bypass_yields_total` | Counter | Forced normal-priority drain steps after exhausting the high-priority bypass budget |
| `throttle_delays_total` | Counter | Normal-priority requests delayed due to broker throttling |
| `throttle_delay_ms_total` | Counter | Total broker-throttle delay applied to normal-priority requests, in milliseconds |
| `active_connections` | Gauge | Current active connections |
| `connect_latency_seconds` | Summary | Connection establishment latency |

## Latency Tracking

The `LatencyTracker` provides detailed latency statistics:

```rust
use krafka::metrics::LatencyTracker;
use std::time::Duration;

let tracker = LatencyTracker::new();

// Manual recording
tracker.record(Duration::from_millis(50));
tracker.record(Duration::from_millis(100));

// Or use guard for automatic timing
{
    let _guard = tracker.start();
    // ... operation being timed ...
} // Guard records latency when dropped

// Get statistics
println!("Count: {}", tracker.count());
println!("Min: {:?}", tracker.min());
println!("Max: {:?}", tracker.max());
println!("Avg: {:?}", tracker.avg());
println!("Sum: {:?}", tracker.sum());

// Get immutable snapshot
let snapshot = tracker.snapshot();
```

## Integration with OpenTelemetry

### Built-in OTLP Export (feature `telemetry`)

Enable the `telemetry` feature for native OTLP protobuf export and KIP-714 broker telemetry:

```toml
[dependencies]
krafka = { version = "...", features = ["telemetry"] }
```

Export metrics as OTLP protobuf bytes for ingestion by any OTLP-compatible backend:

```rust
use krafka::telemetry::otlp::OtlpExporter;
use krafka::metrics::{KrafkaMetrics, MetricsVisitable};

let metrics = KrafkaMetrics::new();
// ... record metrics ...

let mut exporter = OtlpExporter::new(true, 0); // delta temporality
exporter.add_resource_attribute("service.name", "my-service");
metrics.export_all(&mut exporter);
let otlp_bytes: Vec<u8> = exporter.finish();
// Send otlp_bytes to your OTLP receiver via gRPC or HTTP
```

### KIP-714 Automatic Telemetry

The `TelemetryReporter` implements KIP-714 client telemetry — it subscribes to the
broker's telemetry endpoint and pushes metric snapshots on the broker-specified interval:

```rust
use krafka::telemetry::reporter::{TelemetryReporter, TelemetryConfig};

let config = TelemetryConfig {
    enabled: true,
    metrics_prefix: "krafka".into(),
    resource_attributes: vec![
        ("service.name".into(), "my-app".into()),
    ],
};

let reporter = TelemetryReporter::new(connection, krafka_metrics, config, shutdown_rx);
tokio::spawn(reporter.run());
```

The reporter handles subscription polling, push interval jitter, re-subscription on
`UNKNOWN_SUBSCRIPTION_ID`, and a graceful terminating push on shutdown — all per KIP-714.

### Manual Bridge to External OTel SDKs

You can also bridge metrics to an external OpenTelemetry SDK using snapshots or a custom exporter:

```rust
use krafka::metrics::{KrafkaMetrics, ProducerMetricsSnapshot};

fn export_to_otel(snapshot: &ProducerMetricsSnapshot) {
    // Use your OpenTelemetry SDK to record metrics
    // meter.create_counter("krafka.records_sent").add(snapshot.records_sent);
}
```

## Performance Considerations

- All metrics use atomic operations (lock-free)
- Counter increments use `Ordering::Relaxed` for minimal overhead
- Latency tracking uses compare-and-swap for min/max updates
- Gauge updates are immediate (no aggregation)
- Gauge `dec()` saturates at zero (will not underflow below 0), ensuring correctness for connection and partition counting
- Prometheus and JSON export only happen on request (pull-based)
- OTLP protobuf encoding is zero-copy where possible; no external protobuf dependency
- KIP-714 telemetry push runs on a background task with broker-controlled intervals

## Next Steps

- [Producer Guide]producer.md - Configure producer metrics
- [Consumer Guide]consumer.md - Configure consumer metrics
- [Configuration Reference]configuration.md - All configuration options