Skip to main content

hyperi_rustlib/metrics/dfe_groups/
consumer.rs

1// Project:   hyperi-rustlib
2// File:      src/metrics/dfe_groups/consumer.rs
3// Purpose:   DFE consumer metrics group
4// Language:  Rust
5//
6// License:   BUSL-1.1
7// Copyright: (c) 2026 HYPERI PTY LIMITED
8
9//! Kafka consumer metrics for DFE apps.
10
11use metrics::{Counter, Gauge, Histogram};
12
13use super::super::MetricsManager;
14use super::super::manifest::{MetricDescriptor, MetricType};
15
16/// Kafka consumer metrics.
17///
18/// Tracks consumer lag, partition assignments, rebalances, poll latency,
19/// and offset commits.
20#[derive(Clone)]
21pub struct ConsumerMetrics {
22    pub partitions_assigned: Gauge,
23    pub rebalance: Counter,
24    pub poll_duration: Histogram,
25    pub offsets_committed: Counter,
26    namespace: String,
27}
28
29impl ConsumerMetrics {
30    #[must_use]
31    pub fn new(manager: &MetricsManager) -> Self {
32        let ns = manager.namespace();
33
34        // consumer_lag -- label-based, register descriptor manually
35        let lag_key = if ns.is_empty() {
36            "consumer_lag".to_string()
37        } else {
38            format!("{ns}_consumer_lag")
39        };
40        metrics::describe_gauge!(lag_key.clone(), "Kafka consumer lag per topic/partition");
41        manager.registry().push(MetricDescriptor {
42            name: lag_key,
43            metric_type: MetricType::Gauge,
44            description: "Kafka consumer lag per topic/partition".into(),
45            unit: String::new(),
46            labels: vec!["topic".into(), "partition".into()],
47            group: "consumer".into(),
48            buckets: None,
49            use_cases: vec![],
50            dashboard_hint: None,
51        });
52
53        Self {
54            partitions_assigned: manager.gauge_with_labels(
55                "consumer_partitions_assigned",
56                "Current assigned partition count",
57                &[],
58                "consumer",
59            ),
60            rebalance: manager.counter_with_labels(
61                "consumer_rebalance_total",
62                "Consumer group rebalance events",
63                &[],
64                "consumer",
65            ),
66            poll_duration: manager.histogram_with_labels(
67                "consumer_poll_duration_seconds",
68                "Time per Kafka poll/recv call",
69                &[],
70                "consumer",
71                None,
72            ),
73            offsets_committed: manager.counter_with_labels(
74                "offsets_committed_total",
75                "Kafka offsets committed after successful processing",
76                &[],
77                "consumer",
78            ),
79            namespace: ns.to_string(),
80        }
81    }
82
83    /// Set consumer lag for a specific topic/partition.
84    #[inline]
85    pub fn set_lag(&self, topic: &str, partition: i32, lag: i64) {
86        let key = if self.namespace.is_empty() {
87            "consumer_lag".to_string()
88        } else {
89            format!("{}_consumer_lag", self.namespace)
90        };
91        #[allow(clippy::cast_precision_loss)]
92        metrics::gauge!(
93            key,
94            "topic" => topic.to_string(),
95            "partition" => partition.to_string()
96        )
97        .set(lag as f64);
98    }
99
100    #[inline]
101    pub fn set_partitions_assigned(&self, count: usize) {
102        self.partitions_assigned.set(count as f64);
103    }
104
105    #[inline]
106    pub fn record_rebalance(&self) {
107        self.rebalance.increment(1);
108    }
109
110    #[inline]
111    pub fn record_poll_duration(&self, seconds: f64) {
112        self.poll_duration.record(seconds);
113    }
114
115    #[inline]
116    pub fn record_offsets_committed(&self, count: u64) {
117        self.offsets_committed.increment(count);
118    }
119}