hyperi_rustlib/metrics/dfe_groups/
consumer.rs1use metrics::{Counter, Gauge, Histogram};
12
13use super::super::MetricsManager;
14use super::super::manifest::{MetricDescriptor, MetricType};
15
16#[derive(Clone)]
21pub struct ConsumerMetrics {
22 pub partitions_assigned: Gauge,
23 pub rebalance: Counter,
24 pub poll_duration: Histogram,
25 pub offsets_committed: Counter,
26 namespace: String,
27}
28
29impl ConsumerMetrics {
30 #[must_use]
31 pub fn new(manager: &MetricsManager) -> Self {
32 let ns = manager.namespace();
33
34 let lag_key = if ns.is_empty() {
36 "consumer_lag".to_string()
37 } else {
38 format!("{ns}_consumer_lag")
39 };
40 metrics::describe_gauge!(lag_key.clone(), "Kafka consumer lag per topic/partition");
41 manager.registry().push(MetricDescriptor {
42 name: lag_key,
43 metric_type: MetricType::Gauge,
44 description: "Kafka consumer lag per topic/partition".into(),
45 unit: String::new(),
46 labels: vec!["topic".into(), "partition".into()],
47 group: "consumer".into(),
48 buckets: None,
49 use_cases: vec![],
50 dashboard_hint: None,
51 });
52
53 Self {
54 partitions_assigned: manager.gauge_with_labels(
55 "consumer_partitions_assigned",
56 "Current assigned partition count",
57 &[],
58 "consumer",
59 ),
60 rebalance: manager.counter_with_labels(
61 "consumer_rebalance_total",
62 "Consumer group rebalance events",
63 &[],
64 "consumer",
65 ),
66 poll_duration: manager.histogram_with_labels(
67 "consumer_poll_duration_seconds",
68 "Time per Kafka poll/recv call",
69 &[],
70 "consumer",
71 None,
72 ),
73 offsets_committed: manager.counter_with_labels(
74 "offsets_committed_total",
75 "Kafka offsets committed after successful processing",
76 &[],
77 "consumer",
78 ),
79 namespace: ns.to_string(),
80 }
81 }
82
83 #[inline]
85 pub fn set_lag(&self, topic: &str, partition: i32, lag: i64) {
86 let key = if self.namespace.is_empty() {
87 "consumer_lag".to_string()
88 } else {
89 format!("{}_consumer_lag", self.namespace)
90 };
91 #[allow(clippy::cast_precision_loss)]
92 metrics::gauge!(
93 key,
94 "topic" => topic.to_string(),
95 "partition" => partition.to_string()
96 )
97 .set(lag as f64);
98 }
99
100 #[inline]
101 pub fn set_partitions_assigned(&self, count: usize) {
102 self.partitions_assigned.set(count as f64);
103 }
104
105 #[inline]
106 pub fn record_rebalance(&self) {
107 self.rebalance.increment(1);
108 }
109
110 #[inline]
111 pub fn record_poll_duration(&self, seconds: f64) {
112 self.poll_duration.record(seconds);
113 }
114
115 #[inline]
116 pub fn record_offsets_committed(&self, count: u64) {
117 self.offsets_committed.increment(count);
118 }
119}