1use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7#[derive(Debug)]
9pub struct KafkaMetrics {
10 pub connections_total: AtomicU64,
12 pub connections_active: AtomicU64,
14 pub requests_total: AtomicU64,
16 pub requests_by_api: HashMap<i16, AtomicU64>,
18 pub responses_total: AtomicU64,
20 pub messages_produced_total: AtomicU64,
22 pub messages_consumed_total: AtomicU64,
24 pub topics_created_total: AtomicU64,
26 pub topics_deleted_total: AtomicU64,
28 pub consumer_groups_total: AtomicU64,
30 pub partitions_total: AtomicU64,
32 pub request_latency_micros: AtomicU64,
34 pub errors_total: AtomicU64,
36}
37
38impl KafkaMetrics {
39 pub fn new() -> Self {
41 let mut requests_by_api = HashMap::new();
42 for api_key in &[0, 1, 3, 9, 15, 16, 18, 19, 20, 32, 49] {
44 requests_by_api.insert(*api_key, AtomicU64::new(0));
45 }
46
47 Self {
48 connections_total: AtomicU64::new(0),
49 connections_active: AtomicU64::new(0),
50 requests_total: AtomicU64::new(0),
51 requests_by_api,
52 responses_total: AtomicU64::new(0),
53 messages_produced_total: AtomicU64::new(0),
54 messages_consumed_total: AtomicU64::new(0),
55 topics_created_total: AtomicU64::new(0),
56 topics_deleted_total: AtomicU64::new(0),
57 consumer_groups_total: AtomicU64::new(0),
58 partitions_total: AtomicU64::new(0),
59 request_latency_micros: AtomicU64::new(0),
60 errors_total: AtomicU64::new(0),
61 }
62 }
63
64 pub fn record_connection(&self) {
66 self.connections_total.fetch_add(1, Ordering::Relaxed);
67 self.connections_active.fetch_add(1, Ordering::Relaxed);
68 }
69
70 pub fn record_connection_closed(&self) {
72 self.connections_active.fetch_sub(1, Ordering::Relaxed);
73 }
74
75 pub fn record_request(&self, api_key: i16) {
77 self.requests_total.fetch_add(1, Ordering::Relaxed);
78 if let Some(counter) = self.requests_by_api.get(&api_key) {
79 counter.fetch_add(1, Ordering::Relaxed);
80 }
81 }
82
83 pub fn record_response(&self) {
85 self.responses_total.fetch_add(1, Ordering::Relaxed);
86 }
87
88 pub fn record_messages_produced(&self, count: u64) {
90 self.messages_produced_total.fetch_add(count, Ordering::Relaxed);
91 }
92
93 pub fn record_messages_consumed(&self, count: u64) {
95 self.messages_consumed_total.fetch_add(count, Ordering::Relaxed);
96 }
97
98 pub fn record_topic_created(&self) {
100 self.topics_created_total.fetch_add(1, Ordering::Relaxed);
101 }
102
103 pub fn record_topic_deleted(&self) {
105 self.topics_deleted_total.fetch_add(1, Ordering::Relaxed);
106 }
107
108 pub fn record_consumer_group_created(&self) {
110 self.consumer_groups_total.fetch_add(1, Ordering::Relaxed);
111 }
112
113 pub fn record_partition_created(&self) {
115 self.partitions_total.fetch_add(1, Ordering::Relaxed);
116 }
117
118 pub fn record_request_latency(&self, latency_micros: u64) {
120 let current = self.request_latency_micros.load(Ordering::Relaxed);
122 let new_avg = (current + latency_micros) / 2;
123 self.request_latency_micros.store(new_avg, Ordering::Relaxed);
124 }
125
126 pub fn record_error(&self) {
128 self.errors_total.fetch_add(1, Ordering::Relaxed);
129 }
130
131 pub fn snapshot(&self) -> MetricsSnapshot {
133 MetricsSnapshot {
134 connections_total: self.connections_total.load(Ordering::Relaxed),
135 connections_active: self.connections_active.load(Ordering::Relaxed),
136 requests_total: self.requests_total.load(Ordering::Relaxed),
137 responses_total: self.responses_total.load(Ordering::Relaxed),
138 messages_produced_total: self.messages_produced_total.load(Ordering::Relaxed),
139 messages_consumed_total: self.messages_consumed_total.load(Ordering::Relaxed),
140 topics_created_total: self.topics_created_total.load(Ordering::Relaxed),
141 topics_deleted_total: self.topics_deleted_total.load(Ordering::Relaxed),
142 consumer_groups_total: self.consumer_groups_total.load(Ordering::Relaxed),
143 partitions_total: self.partitions_total.load(Ordering::Relaxed),
144 avg_request_latency_micros: self.request_latency_micros.load(Ordering::Relaxed),
145 errors_total: self.errors_total.load(Ordering::Relaxed),
146 }
147 }
148}
149
150#[derive(Debug, Clone)]
152pub struct MetricsSnapshot {
153 pub connections_total: u64,
154 pub connections_active: u64,
155 pub requests_total: u64,
156 pub responses_total: u64,
157 pub messages_produced_total: u64,
158 pub messages_consumed_total: u64,
159 pub topics_created_total: u64,
160 pub topics_deleted_total: u64,
161 pub consumer_groups_total: u64,
162 pub partitions_total: u64,
163 pub avg_request_latency_micros: u64,
164 pub errors_total: u64,
165}
166
167impl Default for KafkaMetrics {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173pub struct MetricsExporter {
175 metrics: Arc<KafkaMetrics>,
176}
177
178impl MetricsExporter {
179 pub fn new(metrics: Arc<KafkaMetrics>) -> Self {
181 Self { metrics }
182 }
183
184 pub fn export_prometheus(&self) -> String {
186 let snapshot = self.metrics.snapshot();
187
188 format!(
189 "# HELP kafka_connections_total Total number of connections\n\
190 # TYPE kafka_connections_total counter\n\
191 kafka_connections_total {}\n\
192 # HELP kafka_connections_active Number of active connections\n\
193 # TYPE kafka_connections_active gauge\n\
194 kafka_connections_active {}\n\
195 # HELP kafka_requests_total Total number of requests\n\
196 # TYPE kafka_requests_total counter\n\
197 kafka_requests_total {}\n\
198 # HELP kafka_responses_total Total number of responses\n\
199 # TYPE kafka_responses_total counter\n\
200 kafka_responses_total {}\n\
201 # HELP kafka_messages_produced_total Total messages produced\n\
202 # TYPE kafka_messages_produced_total counter\n\
203 kafka_messages_produced_total {}\n\
204 # HELP kafka_messages_consumed_total Total messages consumed\n\
205 # TYPE kafka_messages_consumed_total counter\n\
206 kafka_messages_consumed_total {}\n\
207 # HELP kafka_topics_created_total Total topics created\n\
208 # TYPE kafka_topics_created_total counter\n\
209 kafka_topics_created_total {}\n\
210 # HELP kafka_topics_deleted_total Total topics deleted\n\
211 # TYPE kafka_topics_deleted_total counter\n\
212 kafka_topics_deleted_total {}\n\
213 # HELP kafka_consumer_groups_total Total consumer groups\n\
214 # TYPE kafka_consumer_groups_total gauge\n\
215 kafka_consumer_groups_total {}\n\
216 # HELP kafka_partitions_total Total partitions\n\
217 # TYPE kafka_partitions_total gauge\n\
218 kafka_partitions_total {}\n\
219 # HELP kafka_request_latency_micros_avg Average request latency in microseconds\n\
220 # TYPE kafka_request_latency_micros_avg gauge\n\
221 kafka_request_latency_micros_avg {}\n\
222 # HELP kafka_errors_total Total errors\n\
223 # TYPE kafka_errors_total counter\n\
224 kafka_errors_total {}\n",
225 snapshot.connections_total,
226 snapshot.connections_active,
227 snapshot.requests_total,
228 snapshot.responses_total,
229 snapshot.messages_produced_total,
230 snapshot.messages_consumed_total,
231 snapshot.topics_created_total,
232 snapshot.topics_deleted_total,
233 snapshot.consumer_groups_total,
234 snapshot.partitions_total,
235 snapshot.avg_request_latency_micros,
236 snapshot.errors_total
237 )
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244
245 #[test]
246 fn test_kafka_metrics_new() {
247 let metrics = KafkaMetrics::new();
248 assert_eq!(metrics.connections_total.load(Ordering::Relaxed), 0);
249 assert_eq!(metrics.connections_active.load(Ordering::Relaxed), 0);
250 assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 0);
251 }
252
253 #[test]
254 fn test_kafka_metrics_default() {
255 let metrics = KafkaMetrics::default();
256 assert_eq!(metrics.connections_total.load(Ordering::Relaxed), 0);
257 }
258
259 #[test]
260 fn test_record_connection() {
261 let metrics = KafkaMetrics::new();
262 metrics.record_connection();
263 assert_eq!(metrics.connections_total.load(Ordering::Relaxed), 1);
264 assert_eq!(metrics.connections_active.load(Ordering::Relaxed), 1);
265 }
266
267 #[test]
268 fn test_record_connection_closed() {
269 let metrics = KafkaMetrics::new();
270 metrics.record_connection();
271 metrics.record_connection();
272 metrics.record_connection_closed();
273 assert_eq!(metrics.connections_total.load(Ordering::Relaxed), 2);
274 assert_eq!(metrics.connections_active.load(Ordering::Relaxed), 1);
275 }
276
277 #[test]
278 fn test_record_request() {
279 let metrics = KafkaMetrics::new();
280 metrics.record_request(0); metrics.record_request(1); metrics.record_request(0);
283 assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 3);
284 }
285
286 #[test]
287 fn test_record_request_unknown_api() {
288 let metrics = KafkaMetrics::new();
289 metrics.record_request(999); assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 1);
291 }
292
293 #[test]
294 fn test_record_response() {
295 let metrics = KafkaMetrics::new();
296 metrics.record_response();
297 metrics.record_response();
298 assert_eq!(metrics.responses_total.load(Ordering::Relaxed), 2);
299 }
300
301 #[test]
302 fn test_record_messages_produced() {
303 let metrics = KafkaMetrics::new();
304 metrics.record_messages_produced(10);
305 metrics.record_messages_produced(5);
306 assert_eq!(metrics.messages_produced_total.load(Ordering::Relaxed), 15);
307 }
308
309 #[test]
310 fn test_record_messages_consumed() {
311 let metrics = KafkaMetrics::new();
312 metrics.record_messages_consumed(20);
313 assert_eq!(metrics.messages_consumed_total.load(Ordering::Relaxed), 20);
314 }
315
316 #[test]
317 fn test_record_topic_created() {
318 let metrics = KafkaMetrics::new();
319 metrics.record_topic_created();
320 metrics.record_topic_created();
321 assert_eq!(metrics.topics_created_total.load(Ordering::Relaxed), 2);
322 }
323
324 #[test]
325 fn test_record_topic_deleted() {
326 let metrics = KafkaMetrics::new();
327 metrics.record_topic_deleted();
328 assert_eq!(metrics.topics_deleted_total.load(Ordering::Relaxed), 1);
329 }
330
331 #[test]
332 fn test_record_consumer_group_created() {
333 let metrics = KafkaMetrics::new();
334 metrics.record_consumer_group_created();
335 assert_eq!(metrics.consumer_groups_total.load(Ordering::Relaxed), 1);
336 }
337
338 #[test]
339 fn test_record_partition_created() {
340 let metrics = KafkaMetrics::new();
341 metrics.record_partition_created();
342 metrics.record_partition_created();
343 metrics.record_partition_created();
344 assert_eq!(metrics.partitions_total.load(Ordering::Relaxed), 3);
345 }
346
347 #[test]
348 fn test_record_request_latency() {
349 let metrics = KafkaMetrics::new();
350 metrics.record_request_latency(100);
351 metrics.record_request_latency(200);
352 let latency = metrics.request_latency_micros.load(Ordering::Relaxed);
354 assert!(latency > 0);
355 }
356
357 #[test]
358 fn test_record_error() {
359 let metrics = KafkaMetrics::new();
360 metrics.record_error();
361 metrics.record_error();
362 assert_eq!(metrics.errors_total.load(Ordering::Relaxed), 2);
363 }
364
365 #[test]
366 fn test_snapshot() {
367 let metrics = KafkaMetrics::new();
368 metrics.record_connection();
369 metrics.record_request(0);
370 metrics.record_messages_produced(5);
371
372 let snapshot = metrics.snapshot();
373 assert_eq!(snapshot.connections_total, 1);
374 assert_eq!(snapshot.connections_active, 1);
375 assert_eq!(snapshot.requests_total, 1);
376 assert_eq!(snapshot.messages_produced_total, 5);
377 }
378
379 #[test]
380 fn test_snapshot_clone() {
381 let metrics = KafkaMetrics::new();
382 metrics.record_connection();
383
384 let snapshot = metrics.snapshot();
385 let cloned = snapshot.clone();
386 assert_eq!(snapshot.connections_total, cloned.connections_total);
387 }
388
389 #[test]
390 fn test_metrics_exporter_new() {
391 let metrics = Arc::new(KafkaMetrics::new());
392 let _exporter = MetricsExporter::new(metrics);
393 }
394
395 #[test]
396 fn test_metrics_exporter_prometheus() {
397 let metrics = Arc::new(KafkaMetrics::new());
398 metrics.record_connection();
399 metrics.record_messages_produced(100);
400
401 let exporter = MetricsExporter::new(metrics);
402 let output = exporter.export_prometheus();
403
404 assert!(output.contains("kafka_connections_total 1"));
405 assert!(output.contains("kafka_messages_produced_total 100"));
406 assert!(output.contains("# HELP"));
407 assert!(output.contains("# TYPE"));
408 }
409
410 #[test]
411 fn test_kafka_metrics_debug() {
412 let metrics = KafkaMetrics::new();
413 let debug = format!("{:?}", metrics);
414 assert!(debug.contains("KafkaMetrics"));
415 }
416
417 #[test]
418 fn test_metrics_snapshot_debug() {
419 let metrics = KafkaMetrics::new();
420 let snapshot = metrics.snapshot();
421 let debug = format!("{:?}", snapshot);
422 assert!(debug.contains("MetricsSnapshot"));
423 }
424}