hyperi-rustlib 2.5.5

Opinionated Rust framework for high-throughput data pipelines at PB scale. Auto-wiring config, logging, metrics, tracing, health, and graceful shutdown — built from many years of production infrastructure experience.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
// Project:   hyperi-rustlib
// File:      src/transport/kafka/metrics.rs
// Purpose:   Kafka metrics collection via librdkafka statistics
// Language:  Rust
//
// License:   FSL-1.1-ALv2
// Copyright: (c) 2026 HYPERI PTY LIMITED

//! Kafka metrics collection via librdkafka statistics callback.
//!
//! Provides a `StatsContext` implementation that collects librdkafka statistics
//! and exposes them through a `KafkaMetrics` snapshot. Matches the Python
//! `hs_pylib.kafka.KafkaMetricsCollector` API.
//!
//! ## Usage
//!
//! Enable statistics by setting `statistics.interval.ms` in the Kafka config:
//!
//! ```rust,ignore
//! use hyperi_rustlib::transport::kafka::{KafkaConfig, KafkaMetrics, StatsContext};
//! use std::sync::Arc;
//!
//! let stats = Arc::new(StatsContext::new());
//! let mut config = KafkaConfig::default();
//! config.extra_config.insert("statistics.interval.ms".to_string(), "5000".to_string());
//!
//! // Use stats.clone() as the context when creating consumer/producer
//! // Then periodically:
//! let metrics = stats.get_metrics();
//! println!("Messages sent: {}", metrics.messages_sent);
//! println!("Consumer lag: {:?}", metrics.partition_lag);
//! ```

use rdkafka::client::ClientContext;
use rdkafka::config::RDKafkaLogLevel;
use rdkafka::error::KafkaError;
use rdkafka::statistics::Statistics;
use std::collections::HashMap;
use std::sync::RwLock;

/// Kafka metrics snapshot.
///
/// Contains aggregated statistics from librdkafka, matching the Python
/// `KafkaMetrics` dataclass structure.
#[derive(Debug, Clone, Default)]
pub struct KafkaMetrics {
    // --- Client-level metrics ---
    /// Total messages sent (produced).
    pub messages_sent: i64,
    /// Total messages received (consumed).
    pub messages_received: i64,
    /// Total bytes sent.
    pub bytes_sent: i64,
    /// Total bytes received.
    pub bytes_received: i64,
    /// Current messages in producer queue.
    pub queue_message_count: u64,
    /// Current bytes in producer queue.
    pub queue_byte_count: u64,

    // --- Per-broker metrics ---
    /// Per-broker statistics keyed by broker name.
    pub brokers: HashMap<String, BrokerMetrics>,

    // --- Per-partition metrics (consumer) ---
    /// Per-partition statistics keyed by (topic, partition).
    pub partition_lag: HashMap<(String, i32), i64>,
    /// Per-partition committed offsets.
    pub partition_committed: HashMap<(String, i32), i64>,
    /// Per-partition high watermarks.
    pub partition_high_watermark: HashMap<(String, i32), i64>,

    // --- Consumer group metrics ---
    /// Consumer group state (e.g., "up", "rebalancing").
    pub consumer_group_state: Option<String>,
    /// Total number of rebalances.
    pub rebalance_count: i64,
    /// Time since last rebalance in milliseconds.
    pub rebalance_age_ms: i64,

    // --- Timestamp ---
    /// Unix timestamp when these stats were collected.
    pub timestamp: i64,
}

/// Per-broker metrics.
#[derive(Debug, Clone, Default)]
pub struct BrokerMetrics {
    /// Broker state ("UP", "DOWN", "INIT", etc.).
    pub state: String,
    /// Average round-trip time in milliseconds.
    pub rtt_avg_ms: f64,
    /// 99th percentile RTT in milliseconds.
    pub rtt_p99_ms: f64,
    /// Total throttle time in milliseconds.
    pub throttle_time_ms: i64,
    /// Messages in output buffer.
    pub outbuf_msg_cnt: i64,
    /// Requests waiting for response.
    pub waitresp_cnt: i64,
    /// Total requests sent.
    pub requests_sent: u64,
    /// Total responses received.
    pub responses_received: u64,
    /// Total request errors.
    pub request_errors: u64,
}

/// Statistics-collecting client context.
///
/// Implements `ClientContext` to receive librdkafka statistics callbacks.
/// Use with consumers or producers that need metrics collection.
///
/// Thread-safe: can be shared across multiple Kafka clients.
#[derive(Debug)]
pub struct StatsContext {
    stats: RwLock<Option<Statistics>>,
    latest_metrics: RwLock<KafkaMetrics>,
}

impl Default for StatsContext {
    fn default() -> Self {
        Self::new()
    }
}

impl StatsContext {
    /// Create a new statistics context.
    #[must_use]
    pub fn new() -> Self {
        Self {
            stats: RwLock::new(None),
            latest_metrics: RwLock::new(KafkaMetrics::default()),
        }
    }

    /// Get the latest metrics snapshot.
    ///
    /// Returns a clone of the most recently collected metrics.
    #[must_use]
    pub fn get_metrics(&self) -> KafkaMetrics {
        self.latest_metrics
            .read()
            .map(|m| m.clone())
            .unwrap_or_default()
    }

    /// Get the raw librdkafka statistics.
    ///
    /// Returns the full `Statistics` struct from the last callback.
    #[must_use]
    pub fn get_raw_stats(&self) -> Option<Statistics> {
        self.stats.read().ok().and_then(|s| s.clone())
    }

    /// Convert raw statistics to our metrics format.
    fn convert_stats(stats: &Statistics) -> KafkaMetrics {
        let mut metrics = KafkaMetrics {
            messages_sent: stats.txmsgs,
            messages_received: stats.rxmsgs,
            bytes_sent: stats.tx_bytes,
            bytes_received: stats.rx_bytes,
            queue_message_count: stats.msg_cnt,
            queue_byte_count: stats.msg_size,
            timestamp: stats.time,
            ..Default::default()
        };

        // Per-broker metrics
        for (name, broker) in &stats.brokers {
            let rtt_avg_ms = broker.rtt.as_ref().map_or(0.0, |w| w.avg as f64 / 1000.0);
            let rtt_p99_ms = broker.rtt.as_ref().map_or(0.0, |w| w.p99 as f64 / 1000.0);
            let throttle_time_ms = broker.throttle.as_ref().map_or(0, |w| w.sum);

            metrics.brokers.insert(
                name.clone(),
                BrokerMetrics {
                    state: broker.state.clone(),
                    rtt_avg_ms,
                    rtt_p99_ms,
                    throttle_time_ms,
                    outbuf_msg_cnt: broker.outbuf_msg_cnt,
                    waitresp_cnt: broker.waitresp_cnt,
                    requests_sent: broker.tx,
                    responses_received: broker.rx,
                    request_errors: broker.txerrs,
                },
            );
        }

        // Per-partition metrics from topics
        for (topic_name, topic) in &stats.topics {
            for (partition_id, partition) in &topic.partitions {
                let key = (topic_name.clone(), *partition_id);

                // Consumer lag
                if partition.consumer_lag >= 0 {
                    metrics
                        .partition_lag
                        .insert(key.clone(), partition.consumer_lag);
                }

                // Committed offset
                if partition.committed_offset >= 0 {
                    metrics
                        .partition_committed
                        .insert(key.clone(), partition.committed_offset);
                }

                // High watermark
                if partition.hi_offset >= 0 {
                    metrics
                        .partition_high_watermark
                        .insert(key, partition.hi_offset);
                }
            }
        }

        // Consumer group metrics
        if let Some(ref cgrp) = stats.cgrp {
            metrics.consumer_group_state = Some(cgrp.state.clone());
            metrics.rebalance_count = cgrp.rebalance_cnt;
            metrics.rebalance_age_ms = cgrp.rebalance_age;
        }

        metrics
    }
}

impl ClientContext for StatsContext {
    fn stats(&self, statistics: Statistics) {
        // Convert and store metrics
        let metrics = Self::convert_stats(&statistics);

        if let Ok(mut lock) = self.latest_metrics.write() {
            *lock = metrics;
        }

        // Also store raw stats
        if let Ok(mut lock) = self.stats.write() {
            *lock = Some(statistics);
        }

        // Auto-emit as Prometheus metrics if a recorder is installed
        #[cfg(feature = "metrics")]
        self.emit_prometheus_metrics();
    }

    fn log(&self, level: RDKafkaLogLevel, fac: &str, log_message: &str) {
        // Forward to tracing if available
        match level {
            RDKafkaLogLevel::Emerg
            | RDKafkaLogLevel::Alert
            | RDKafkaLogLevel::Critical
            | RDKafkaLogLevel::Error => {
                #[cfg(feature = "logger")]
                tracing::error!(target: "librdkafka", facility = fac, "{}", log_message);
                #[cfg(not(feature = "logger"))]
                eprintln!("ERROR librdkafka: {} {}", fac, log_message);
            }
            RDKafkaLogLevel::Warning => {
                #[cfg(feature = "logger")]
                tracing::warn!(target: "librdkafka", facility = fac, "{}", log_message);
                #[cfg(not(feature = "logger"))]
                eprintln!("WARN librdkafka: {} {}", fac, log_message);
            }
            RDKafkaLogLevel::Notice | RDKafkaLogLevel::Info => {
                // rdkafka INFO/Notice is too verbose for application-level INFO
                // (statistics JSON every statistics.interval.ms, connection lifecycle, etc.)
                #[cfg(feature = "logger")]
                tracing::debug!(target: "librdkafka", facility = fac, "{}", log_message);
                #[cfg(not(feature = "logger"))]
                {}
            }
            RDKafkaLogLevel::Debug => {
                #[cfg(feature = "logger")]
                tracing::debug!(target: "librdkafka", facility = fac, "{}", log_message);
                #[cfg(not(feature = "logger"))]
                {}
            }
        }
    }

    fn error(&self, error: KafkaError, reason: &str) {
        #[cfg(feature = "logger")]
        tracing::error!(target: "librdkafka", error = %error, "{}", reason);
        #[cfg(not(feature = "logger"))]
        eprintln!("ERROR librdkafka: {}: {}", error, reason);
    }
}

impl StatsContext {
    /// Emit current metrics as Prometheus gauges/counters via the `metrics` crate.
    ///
    /// Call periodically (e.g. after each stats callback) to push rdkafka
    /// internal statistics to the global metrics recorder. No-op if no
    /// recorder is installed.
    ///
    /// Emits under the `rdkafka_` prefix per the DFE metrics standard.
    /// Per-partition metrics are bounded by `max_partitions` (default 256).
    #[cfg(feature = "metrics")]
    pub fn emit_prometheus_metrics(&self) {
        let m = self.get_metrics();

        // Global counters
        metrics::gauge!("rdkafka_global_msg_cnt").set(m.queue_message_count as f64);
        metrics::gauge!("rdkafka_global_msg_size_bytes").set(m.queue_byte_count as f64);

        // Per-broker metrics
        for (name, broker) in &m.brokers {
            metrics::gauge!(
                "rdkafka_broker_rtt_avg_seconds",
                "broker" => name.clone()
            )
            .set(broker.rtt_avg_ms / 1000.0);

            metrics::gauge!(
                "rdkafka_broker_outbuf_cnt",
                "broker" => name.clone()
            )
            .set(broker.outbuf_msg_cnt as f64);

            metrics::gauge!(
                "rdkafka_broker_waitresp_cnt",
                "broker" => name.clone()
            )
            .set(broker.waitresp_cnt as f64);
        }

        // Per-partition consumer lag (capped at 256 partitions for cardinality safety)
        let max_partitions = 256;
        for (i, ((topic, partition), lag)) in m.partition_lag.iter().enumerate() {
            if i >= max_partitions {
                break;
            }
            metrics::gauge!(
                "rdkafka_topic_partition_consumer_lag",
                "topic" => topic.clone(),
                "partition" => partition.to_string()
            )
            .set(*lag as f64);
        }

        for (i, ((topic, partition), offset)) in m.partition_committed.iter().enumerate() {
            if i >= max_partitions {
                break;
            }
            metrics::gauge!(
                "rdkafka_topic_partition_committed_offset",
                "topic" => topic.clone(),
                "partition" => partition.to_string()
            )
            .set(*offset as f64);
        }

        // Rebalance count
        if m.rebalance_count > 0 {
            metrics::gauge!("rdkafka_consumer_rebalance_count").set(m.rebalance_count as f64);
        }
    }
}

// StatsContext can be used as a ConsumerContext and ProducerContext
impl rdkafka::consumer::ConsumerContext for StatsContext {}

impl rdkafka::producer::ProducerContext for StatsContext {
    type DeliveryOpaque = ();

    fn delivery(
        &self,
        _result: &rdkafka::producer::DeliveryResult<'_>,
        _opaque: Self::DeliveryOpaque,
    ) {
    }
}

/// Calculate total consumer lag across all partitions.
///
/// Helper function to sum lag from a `KafkaMetrics` snapshot.
#[must_use]
pub fn total_consumer_lag(metrics: &KafkaMetrics) -> i64 {
    metrics.partition_lag.values().sum()
}

/// Get brokers in "UP" state.
#[must_use]
pub fn healthy_broker_count(metrics: &KafkaMetrics) -> usize {
    metrics.brokers.values().filter(|b| b.state == "UP").count()
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_stats_context_creation() {
        let ctx = StatsContext::new();
        let metrics = ctx.get_metrics();
        assert_eq!(metrics.messages_sent, 0);
        assert_eq!(metrics.messages_received, 0);
    }

    #[test]
    fn test_kafka_metrics_default() {
        let metrics = KafkaMetrics::default();
        assert_eq!(metrics.messages_sent, 0);
        assert!(metrics.brokers.is_empty());
        assert!(metrics.partition_lag.is_empty());
    }

    #[test]
    fn test_broker_metrics_default() {
        let metrics = BrokerMetrics::default();
        assert_eq!(metrics.state, "");
        assert!(metrics.rtt_avg_ms.abs() < f64::EPSILON);
    }

    #[test]
    fn test_total_consumer_lag() {
        let mut metrics = KafkaMetrics::default();
        metrics.partition_lag.insert(("topic".to_string(), 0), 100);
        metrics.partition_lag.insert(("topic".to_string(), 1), 200);
        metrics.partition_lag.insert(("topic".to_string(), 2), 50);

        assert_eq!(total_consumer_lag(&metrics), 350);
    }

    #[test]
    fn test_healthy_broker_count() {
        let mut metrics = KafkaMetrics::default();
        metrics.brokers.insert(
            "broker1".to_string(),
            BrokerMetrics {
                state: "UP".to_string(),
                ..Default::default()
            },
        );
        metrics.brokers.insert(
            "broker2".to_string(),
            BrokerMetrics {
                state: "DOWN".to_string(),
                ..Default::default()
            },
        );
        metrics.brokers.insert(
            "broker3".to_string(),
            BrokerMetrics {
                state: "UP".to_string(),
                ..Default::default()
            },
        );

        assert_eq!(healthy_broker_count(&metrics), 2);
    }

    #[test]
    fn test_stats_context_is_send_sync() {
        fn assert_send_sync<T: Send + Sync>() {}
        assert_send_sync::<StatsContext>();
    }
}