mockforge_kafka/
metrics.rs

1//! Metrics and monitoring for Kafka broker operations
2
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7/// Metrics collector for Kafka broker operations
8#[derive(Debug)]
9pub struct KafkaMetrics {
10    /// Total number of connections
11    pub connections_total: AtomicU64,
12    /// Active connections
13    pub connections_active: AtomicU64,
14    /// Total requests received
15    pub requests_total: AtomicU64,
16    /// Requests by API key
17    pub requests_by_api: HashMap<i16, AtomicU64>,
18    /// Total responses sent
19    pub responses_total: AtomicU64,
20    /// Total messages produced
21    pub messages_produced_total: AtomicU64,
22    /// Total messages consumed
23    pub messages_consumed_total: AtomicU64,
24    /// Total topics created
25    pub topics_created_total: AtomicU64,
26    /// Total topics deleted
27    pub topics_deleted_total: AtomicU64,
28    /// Total consumer groups
29    pub consumer_groups_total: AtomicU64,
30    /// Total partitions
31    pub partitions_total: AtomicU64,
32    /// Request latency (in microseconds)
33    pub request_latency_micros: AtomicU64,
34    /// Error responses
35    pub errors_total: AtomicU64,
36}
37
38impl KafkaMetrics {
39    /// Create a new metrics collector
40    pub fn new() -> Self {
41        let mut requests_by_api = HashMap::new();
42        // Initialize counters for common API keys
43        for api_key in &[0, 1, 3, 9, 15, 16, 18, 19, 20, 32, 49] {
44            requests_by_api.insert(*api_key, AtomicU64::new(0));
45        }
46
47        Self {
48            connections_total: AtomicU64::new(0),
49            connections_active: AtomicU64::new(0),
50            requests_total: AtomicU64::new(0),
51            requests_by_api,
52            responses_total: AtomicU64::new(0),
53            messages_produced_total: AtomicU64::new(0),
54            messages_consumed_total: AtomicU64::new(0),
55            topics_created_total: AtomicU64::new(0),
56            topics_deleted_total: AtomicU64::new(0),
57            consumer_groups_total: AtomicU64::new(0),
58            partitions_total: AtomicU64::new(0),
59            request_latency_micros: AtomicU64::new(0),
60            errors_total: AtomicU64::new(0),
61        }
62    }
63
64    /// Record a new connection
65    pub fn record_connection(&self) {
66        self.connections_total.fetch_add(1, Ordering::Relaxed);
67        self.connections_active.fetch_add(1, Ordering::Relaxed);
68    }
69
70    /// Record a connection closed
71    pub fn record_connection_closed(&self) {
72        self.connections_active.fetch_sub(1, Ordering::Relaxed);
73    }
74
75    /// Record a request
76    pub fn record_request(&self, api_key: i16) {
77        self.requests_total.fetch_add(1, Ordering::Relaxed);
78        if let Some(counter) = self.requests_by_api.get(&api_key) {
79            counter.fetch_add(1, Ordering::Relaxed);
80        }
81    }
82
83    /// Record a response
84    pub fn record_response(&self) {
85        self.responses_total.fetch_add(1, Ordering::Relaxed);
86    }
87
88    /// Record messages produced
89    pub fn record_messages_produced(&self, count: u64) {
90        self.messages_produced_total.fetch_add(count, Ordering::Relaxed);
91    }
92
93    /// Record messages consumed
94    pub fn record_messages_consumed(&self, count: u64) {
95        self.messages_consumed_total.fetch_add(count, Ordering::Relaxed);
96    }
97
98    /// Record topic created
99    pub fn record_topic_created(&self) {
100        self.topics_created_total.fetch_add(1, Ordering::Relaxed);
101    }
102
103    /// Record topic deleted
104    pub fn record_topic_deleted(&self) {
105        self.topics_deleted_total.fetch_add(1, Ordering::Relaxed);
106    }
107
108    /// Record consumer group created
109    pub fn record_consumer_group_created(&self) {
110        self.consumer_groups_total.fetch_add(1, Ordering::Relaxed);
111    }
112
113    /// Record partition created
114    pub fn record_partition_created(&self) {
115        self.partitions_total.fetch_add(1, Ordering::Relaxed);
116    }
117
118    /// Record request latency
119    pub fn record_request_latency(&self, latency_micros: u64) {
120        // Simple moving average - in production, you'd want more sophisticated tracking
121        let current = self.request_latency_micros.load(Ordering::Relaxed);
122        let new_avg = (current + latency_micros) / 2;
123        self.request_latency_micros.store(new_avg, Ordering::Relaxed);
124    }
125
126    /// Record an error
127    pub fn record_error(&self) {
128        self.errors_total.fetch_add(1, Ordering::Relaxed);
129    }
130
131    /// Get metrics snapshot
132    pub fn snapshot(&self) -> MetricsSnapshot {
133        MetricsSnapshot {
134            connections_total: self.connections_total.load(Ordering::Relaxed),
135            connections_active: self.connections_active.load(Ordering::Relaxed),
136            requests_total: self.requests_total.load(Ordering::Relaxed),
137            responses_total: self.responses_total.load(Ordering::Relaxed),
138            messages_produced_total: self.messages_produced_total.load(Ordering::Relaxed),
139            messages_consumed_total: self.messages_consumed_total.load(Ordering::Relaxed),
140            topics_created_total: self.topics_created_total.load(Ordering::Relaxed),
141            topics_deleted_total: self.topics_deleted_total.load(Ordering::Relaxed),
142            consumer_groups_total: self.consumer_groups_total.load(Ordering::Relaxed),
143            partitions_total: self.partitions_total.load(Ordering::Relaxed),
144            avg_request_latency_micros: self.request_latency_micros.load(Ordering::Relaxed),
145            errors_total: self.errors_total.load(Ordering::Relaxed),
146        }
147    }
148}
149
150/// Snapshot of metrics at a point in time
151#[derive(Debug, Clone)]
152pub struct MetricsSnapshot {
153    pub connections_total: u64,
154    pub connections_active: u64,
155    pub requests_total: u64,
156    pub responses_total: u64,
157    pub messages_produced_total: u64,
158    pub messages_consumed_total: u64,
159    pub topics_created_total: u64,
160    pub topics_deleted_total: u64,
161    pub consumer_groups_total: u64,
162    pub partitions_total: u64,
163    pub avg_request_latency_micros: u64,
164    pub errors_total: u64,
165}
166
167impl Default for KafkaMetrics {
168    fn default() -> Self {
169        Self::new()
170    }
171}
172
173/// Metrics exporter for Prometheus-style metrics
174pub struct MetricsExporter {
175    metrics: Arc<KafkaMetrics>,
176}
177
178impl MetricsExporter {
179    /// Create a new metrics exporter
180    pub fn new(metrics: Arc<KafkaMetrics>) -> Self {
181        Self { metrics }
182    }
183
184    /// Export metrics in Prometheus format
185    pub fn export_prometheus(&self) -> String {
186        let snapshot = self.metrics.snapshot();
187
188        format!(
189            "# HELP kafka_connections_total Total number of connections\n\
190             # TYPE kafka_connections_total counter\n\
191             kafka_connections_total {}\n\
192             # HELP kafka_connections_active Number of active connections\n\
193             # TYPE kafka_connections_active gauge\n\
194             kafka_connections_active {}\n\
195             # HELP kafka_requests_total Total number of requests\n\
196             # TYPE kafka_requests_total counter\n\
197             kafka_requests_total {}\n\
198             # HELP kafka_responses_total Total number of responses\n\
199             # TYPE kafka_responses_total counter\n\
200             kafka_responses_total {}\n\
201             # HELP kafka_messages_produced_total Total messages produced\n\
202             # TYPE kafka_messages_produced_total counter\n\
203             kafka_messages_produced_total {}\n\
204             # HELP kafka_messages_consumed_total Total messages consumed\n\
205             # TYPE kafka_messages_consumed_total counter\n\
206             kafka_messages_consumed_total {}\n\
207             # HELP kafka_topics_created_total Total topics created\n\
208             # TYPE kafka_topics_created_total counter\n\
209             kafka_topics_created_total {}\n\
210             # HELP kafka_topics_deleted_total Total topics deleted\n\
211             # TYPE kafka_topics_deleted_total counter\n\
212             kafka_topics_deleted_total {}\n\
213             # HELP kafka_consumer_groups_total Total consumer groups\n\
214             # TYPE kafka_consumer_groups_total gauge\n\
215             kafka_consumer_groups_total {}\n\
216             # HELP kafka_partitions_total Total partitions\n\
217             # TYPE kafka_partitions_total gauge\n\
218             kafka_partitions_total {}\n\
219             # HELP kafka_request_latency_micros_avg Average request latency in microseconds\n\
220             # TYPE kafka_request_latency_micros_avg gauge\n\
221             kafka_request_latency_micros_avg {}\n\
222             # HELP kafka_errors_total Total errors\n\
223             # TYPE kafka_errors_total counter\n\
224             kafka_errors_total {}\n",
225            snapshot.connections_total,
226            snapshot.connections_active,
227            snapshot.requests_total,
228            snapshot.responses_total,
229            snapshot.messages_produced_total,
230            snapshot.messages_consumed_total,
231            snapshot.topics_created_total,
232            snapshot.topics_deleted_total,
233            snapshot.consumer_groups_total,
234            snapshot.partitions_total,
235            snapshot.avg_request_latency_micros,
236            snapshot.errors_total
237        )
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    #[test]
246    fn test_kafka_metrics_new() {
247        let metrics = KafkaMetrics::new();
248        assert_eq!(metrics.connections_total.load(Ordering::Relaxed), 0);
249        assert_eq!(metrics.connections_active.load(Ordering::Relaxed), 0);
250        assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 0);
251    }
252
253    #[test]
254    fn test_kafka_metrics_default() {
255        let metrics = KafkaMetrics::default();
256        assert_eq!(metrics.connections_total.load(Ordering::Relaxed), 0);
257    }
258
259    #[test]
260    fn test_record_connection() {
261        let metrics = KafkaMetrics::new();
262        metrics.record_connection();
263        assert_eq!(metrics.connections_total.load(Ordering::Relaxed), 1);
264        assert_eq!(metrics.connections_active.load(Ordering::Relaxed), 1);
265    }
266
267    #[test]
268    fn test_record_connection_closed() {
269        let metrics = KafkaMetrics::new();
270        metrics.record_connection();
271        metrics.record_connection();
272        metrics.record_connection_closed();
273        assert_eq!(metrics.connections_total.load(Ordering::Relaxed), 2);
274        assert_eq!(metrics.connections_active.load(Ordering::Relaxed), 1);
275    }
276
277    #[test]
278    fn test_record_request() {
279        let metrics = KafkaMetrics::new();
280        metrics.record_request(0); // Produce API
281        metrics.record_request(1); // Fetch API
282        metrics.record_request(0);
283        assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 3);
284    }
285
286    #[test]
287    fn test_record_request_unknown_api() {
288        let metrics = KafkaMetrics::new();
289        metrics.record_request(999); // Unknown API key
290        assert_eq!(metrics.requests_total.load(Ordering::Relaxed), 1);
291    }
292
293    #[test]
294    fn test_record_response() {
295        let metrics = KafkaMetrics::new();
296        metrics.record_response();
297        metrics.record_response();
298        assert_eq!(metrics.responses_total.load(Ordering::Relaxed), 2);
299    }
300
301    #[test]
302    fn test_record_messages_produced() {
303        let metrics = KafkaMetrics::new();
304        metrics.record_messages_produced(10);
305        metrics.record_messages_produced(5);
306        assert_eq!(metrics.messages_produced_total.load(Ordering::Relaxed), 15);
307    }
308
309    #[test]
310    fn test_record_messages_consumed() {
311        let metrics = KafkaMetrics::new();
312        metrics.record_messages_consumed(20);
313        assert_eq!(metrics.messages_consumed_total.load(Ordering::Relaxed), 20);
314    }
315
316    #[test]
317    fn test_record_topic_created() {
318        let metrics = KafkaMetrics::new();
319        metrics.record_topic_created();
320        metrics.record_topic_created();
321        assert_eq!(metrics.topics_created_total.load(Ordering::Relaxed), 2);
322    }
323
324    #[test]
325    fn test_record_topic_deleted() {
326        let metrics = KafkaMetrics::new();
327        metrics.record_topic_deleted();
328        assert_eq!(metrics.topics_deleted_total.load(Ordering::Relaxed), 1);
329    }
330
331    #[test]
332    fn test_record_consumer_group_created() {
333        let metrics = KafkaMetrics::new();
334        metrics.record_consumer_group_created();
335        assert_eq!(metrics.consumer_groups_total.load(Ordering::Relaxed), 1);
336    }
337
338    #[test]
339    fn test_record_partition_created() {
340        let metrics = KafkaMetrics::new();
341        metrics.record_partition_created();
342        metrics.record_partition_created();
343        metrics.record_partition_created();
344        assert_eq!(metrics.partitions_total.load(Ordering::Relaxed), 3);
345    }
346
347    #[test]
348    fn test_record_request_latency() {
349        let metrics = KafkaMetrics::new();
350        metrics.record_request_latency(100);
351        metrics.record_request_latency(200);
352        // Moving average - result depends on implementation
353        let latency = metrics.request_latency_micros.load(Ordering::Relaxed);
354        assert!(latency > 0);
355    }
356
357    #[test]
358    fn test_record_error() {
359        let metrics = KafkaMetrics::new();
360        metrics.record_error();
361        metrics.record_error();
362        assert_eq!(metrics.errors_total.load(Ordering::Relaxed), 2);
363    }
364
365    #[test]
366    fn test_snapshot() {
367        let metrics = KafkaMetrics::new();
368        metrics.record_connection();
369        metrics.record_request(0);
370        metrics.record_messages_produced(5);
371
372        let snapshot = metrics.snapshot();
373        assert_eq!(snapshot.connections_total, 1);
374        assert_eq!(snapshot.connections_active, 1);
375        assert_eq!(snapshot.requests_total, 1);
376        assert_eq!(snapshot.messages_produced_total, 5);
377    }
378
379    #[test]
380    fn test_snapshot_clone() {
381        let metrics = KafkaMetrics::new();
382        metrics.record_connection();
383
384        let snapshot = metrics.snapshot();
385        let cloned = snapshot.clone();
386        assert_eq!(snapshot.connections_total, cloned.connections_total);
387    }
388
389    #[test]
390    fn test_metrics_exporter_new() {
391        let metrics = Arc::new(KafkaMetrics::new());
392        let _exporter = MetricsExporter::new(metrics);
393    }
394
395    #[test]
396    fn test_metrics_exporter_prometheus() {
397        let metrics = Arc::new(KafkaMetrics::new());
398        metrics.record_connection();
399        metrics.record_messages_produced(100);
400
401        let exporter = MetricsExporter::new(metrics);
402        let output = exporter.export_prometheus();
403
404        assert!(output.contains("kafka_connections_total 1"));
405        assert!(output.contains("kafka_messages_produced_total 100"));
406        assert!(output.contains("# HELP"));
407        assert!(output.contains("# TYPE"));
408    }
409
410    #[test]
411    fn test_kafka_metrics_debug() {
412        let metrics = KafkaMetrics::new();
413        let debug = format!("{:?}", metrics);
414        assert!(debug.contains("KafkaMetrics"));
415    }
416
417    #[test]
418    fn test_metrics_snapshot_debug() {
419        let metrics = KafkaMetrics::new();
420        let snapshot = metrics.snapshot();
421        let debug = format!("{:?}", snapshot);
422        assert!(debug.contains("MetricsSnapshot"));
423    }
424}