1use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7#[derive(Debug)]
9pub struct KafkaMetrics {
10 pub connections_total: AtomicU64,
12 pub connections_active: AtomicU64,
14 pub requests_total: AtomicU64,
16 pub requests_by_api: HashMap<i16, AtomicU64>,
18 pub responses_total: AtomicU64,
20 pub messages_produced_total: AtomicU64,
22 pub messages_consumed_total: AtomicU64,
24 pub topics_created_total: AtomicU64,
26 pub topics_deleted_total: AtomicU64,
28 pub consumer_groups_total: AtomicU64,
30 pub partitions_total: AtomicU64,
32 pub request_latency_micros: AtomicU64,
34 pub errors_total: AtomicU64,
36}
37
38impl KafkaMetrics {
39 pub fn new() -> Self {
41 let mut requests_by_api = HashMap::new();
42 for api_key in &[0, 1, 3, 9, 15, 16, 18, 19, 20, 32, 49] {
44 requests_by_api.insert(*api_key, AtomicU64::new(0));
45 }
46
47 Self {
48 connections_total: AtomicU64::new(0),
49 connections_active: AtomicU64::new(0),
50 requests_total: AtomicU64::new(0),
51 requests_by_api,
52 responses_total: AtomicU64::new(0),
53 messages_produced_total: AtomicU64::new(0),
54 messages_consumed_total: AtomicU64::new(0),
55 topics_created_total: AtomicU64::new(0),
56 topics_deleted_total: AtomicU64::new(0),
57 consumer_groups_total: AtomicU64::new(0),
58 partitions_total: AtomicU64::new(0),
59 request_latency_micros: AtomicU64::new(0),
60 errors_total: AtomicU64::new(0),
61 }
62 }
63
64 pub fn record_connection(&self) {
66 self.connections_total.fetch_add(1, Ordering::Relaxed);
67 self.connections_active.fetch_add(1, Ordering::Relaxed);
68 }
69
70 pub fn record_connection_closed(&self) {
72 self.connections_active.fetch_sub(1, Ordering::Relaxed);
73 }
74
75 pub fn record_request(&self, api_key: i16) {
77 self.requests_total.fetch_add(1, Ordering::Relaxed);
78 if let Some(counter) = self.requests_by_api.get(&api_key) {
79 counter.fetch_add(1, Ordering::Relaxed);
80 }
81 }
82
83 pub fn record_response(&self) {
85 self.responses_total.fetch_add(1, Ordering::Relaxed);
86 }
87
88 pub fn record_messages_produced(&self, count: u64) {
90 self.messages_produced_total.fetch_add(count, Ordering::Relaxed);
91 }
92
93 pub fn record_messages_consumed(&self, count: u64) {
95 self.messages_consumed_total.fetch_add(count, Ordering::Relaxed);
96 }
97
98 pub fn record_topic_created(&self) {
100 self.topics_created_total.fetch_add(1, Ordering::Relaxed);
101 }
102
103 pub fn record_topic_deleted(&self) {
105 self.topics_deleted_total.fetch_add(1, Ordering::Relaxed);
106 }
107
108 pub fn record_consumer_group_created(&self) {
110 self.consumer_groups_total.fetch_add(1, Ordering::Relaxed);
111 }
112
113 pub fn record_partition_created(&self) {
115 self.partitions_total.fetch_add(1, Ordering::Relaxed);
116 }
117
118 pub fn record_request_latency(&self, latency_micros: u64) {
120 let current = self.request_latency_micros.load(Ordering::Relaxed);
122 let new_avg = (current + latency_micros) / 2;
123 self.request_latency_micros.store(new_avg, Ordering::Relaxed);
124 }
125
126 pub fn record_error(&self) {
128 self.errors_total.fetch_add(1, Ordering::Relaxed);
129 }
130
131 pub fn snapshot(&self) -> MetricsSnapshot {
133 MetricsSnapshot {
134 connections_total: self.connections_total.load(Ordering::Relaxed),
135 connections_active: self.connections_active.load(Ordering::Relaxed),
136 requests_total: self.requests_total.load(Ordering::Relaxed),
137 responses_total: self.responses_total.load(Ordering::Relaxed),
138 messages_produced_total: self.messages_produced_total.load(Ordering::Relaxed),
139 messages_consumed_total: self.messages_consumed_total.load(Ordering::Relaxed),
140 topics_created_total: self.topics_created_total.load(Ordering::Relaxed),
141 topics_deleted_total: self.topics_deleted_total.load(Ordering::Relaxed),
142 consumer_groups_total: self.consumer_groups_total.load(Ordering::Relaxed),
143 partitions_total: self.partitions_total.load(Ordering::Relaxed),
144 avg_request_latency_micros: self.request_latency_micros.load(Ordering::Relaxed),
145 errors_total: self.errors_total.load(Ordering::Relaxed),
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct MetricsSnapshot {
153 pub connections_total: u64,
154 pub connections_active: u64,
155 pub requests_total: u64,
156 pub responses_total: u64,
157 pub messages_produced_total: u64,
158 pub messages_consumed_total: u64,
159 pub topics_created_total: u64,
160 pub topics_deleted_total: u64,
161 pub consumer_groups_total: u64,
162 pub partitions_total: u64,
163 pub avg_request_latency_micros: u64,
164 pub errors_total: u64,
165}
166
167impl Default for KafkaMetrics {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173pub struct MetricsExporter {
175 metrics: Arc<KafkaMetrics>,
176}
177
178impl MetricsExporter {
179 pub fn new(metrics: Arc<KafkaMetrics>) -> Self {
181 Self { metrics }
182 }
183
184 pub fn export_prometheus(&self) -> String {
186 let snapshot = self.metrics.snapshot();
187
188 format!(
189 "# HELP kafka_connections_total Total number of connections\n\
190 # TYPE kafka_connections_total counter\n\
191 kafka_connections_total {}\n\
192 # HELP kafka_connections_active Number of active connections\n\
193 # TYPE kafka_connections_active gauge\n\
194 kafka_connections_active {}\n\
195 # HELP kafka_requests_total Total number of requests\n\
196 # TYPE kafka_requests_total counter\n\
197 kafka_requests_total {}\n\
198 # HELP kafka_responses_total Total number of responses\n\
199 # TYPE kafka_responses_total counter\n\
200 kafka_responses_total {}\n\
201 # HELP kafka_messages_produced_total Total messages produced\n\
202 # TYPE kafka_messages_produced_total counter\n\
203 kafka_messages_produced_total {}\n\
204 # HELP kafka_messages_consumed_total Total messages consumed\n\
205 # TYPE kafka_messages_consumed_total counter\n\
206 kafka_messages_consumed_total {}\n\
207 # HELP kafka_topics_created_total Total topics created\n\
208 # TYPE kafka_topics_created_total counter\n\
209 kafka_topics_created_total {}\n\
210 # HELP kafka_topics_deleted_total Total topics deleted\n\
211 # TYPE kafka_topics_deleted_total counter\n\
212 kafka_topics_deleted_total {}\n\
213 # HELP kafka_consumer_groups_total Total consumer groups\n\
214 # TYPE kafka_consumer_groups_total gauge\n\
215 kafka_consumer_groups_total {}\n\
216 # HELP kafka_partitions_total Total partitions\n\
217 # TYPE kafka_partitions_total gauge\n\
218 kafka_partitions_total {}\n\
219 # HELP kafka_request_latency_micros_avg Average request latency in microseconds\n\
220 # TYPE kafka_request_latency_micros_avg gauge\n\
221 kafka_request_latency_micros_avg {}\n\
222 # HELP kafka_errors_total Total errors\n\
223 # TYPE kafka_errors_total counter\n\
224 kafka_errors_total {}\n",
225 snapshot.connections_total,
226 snapshot.connections_active,
227 snapshot.requests_total,
228 snapshot.responses_total,
229 snapshot.messages_produced_total,
230 snapshot.messages_consumed_total,
231 snapshot.topics_created_total,
232 snapshot.topics_deleted_total,
233 snapshot.consumer_groups_total,
234 snapshot.partitions_total,
235 snapshot.avg_request_latency_micros,
236 snapshot.errors_total
237 )
238 }
239}