mockforge_kafka/
metrics.rs

1//! Metrics and monitoring for Kafka broker operations
2
3use std::collections::HashMap;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::Arc;
6
7/// Metrics collector for Kafka broker operations
8#[derive(Debug)]
9pub struct KafkaMetrics {
10    /// Total number of connections
11    pub connections_total: AtomicU64,
12    /// Active connections
13    pub connections_active: AtomicU64,
14    /// Total requests received
15    pub requests_total: AtomicU64,
16    /// Requests by API key
17    pub requests_by_api: HashMap<i16, AtomicU64>,
18    /// Total responses sent
19    pub responses_total: AtomicU64,
20    /// Total messages produced
21    pub messages_produced_total: AtomicU64,
22    /// Total messages consumed
23    pub messages_consumed_total: AtomicU64,
24    /// Total topics created
25    pub topics_created_total: AtomicU64,
26    /// Total topics deleted
27    pub topics_deleted_total: AtomicU64,
28    /// Total consumer groups
29    pub consumer_groups_total: AtomicU64,
30    /// Total partitions
31    pub partitions_total: AtomicU64,
32    /// Request latency (in microseconds)
33    pub request_latency_micros: AtomicU64,
34    /// Error responses
35    pub errors_total: AtomicU64,
36}
37
38impl KafkaMetrics {
39    /// Create a new metrics collector
40    pub fn new() -> Self {
41        let mut requests_by_api = HashMap::new();
42        // Initialize counters for common API keys
43        for api_key in &[0, 1, 3, 9, 15, 16, 18, 19, 20, 32, 49] {
44            requests_by_api.insert(*api_key, AtomicU64::new(0));
45        }
46
47        Self {
48            connections_total: AtomicU64::new(0),
49            connections_active: AtomicU64::new(0),
50            requests_total: AtomicU64::new(0),
51            requests_by_api,
52            responses_total: AtomicU64::new(0),
53            messages_produced_total: AtomicU64::new(0),
54            messages_consumed_total: AtomicU64::new(0),
55            topics_created_total: AtomicU64::new(0),
56            topics_deleted_total: AtomicU64::new(0),
57            consumer_groups_total: AtomicU64::new(0),
58            partitions_total: AtomicU64::new(0),
59            request_latency_micros: AtomicU64::new(0),
60            errors_total: AtomicU64::new(0),
61        }
62    }
63
64    /// Record a new connection
65    pub fn record_connection(&self) {
66        self.connections_total.fetch_add(1, Ordering::Relaxed);
67        self.connections_active.fetch_add(1, Ordering::Relaxed);
68    }
69
70    /// Record a connection closed
71    pub fn record_connection_closed(&self) {
72        self.connections_active.fetch_sub(1, Ordering::Relaxed);
73    }
74
75    /// Record a request
76    pub fn record_request(&self, api_key: i16) {
77        self.requests_total.fetch_add(1, Ordering::Relaxed);
78        if let Some(counter) = self.requests_by_api.get(&api_key) {
79            counter.fetch_add(1, Ordering::Relaxed);
80        }
81    }
82
83    /// Record a response
84    pub fn record_response(&self) {
85        self.responses_total.fetch_add(1, Ordering::Relaxed);
86    }
87
88    /// Record messages produced
89    pub fn record_messages_produced(&self, count: u64) {
90        self.messages_produced_total.fetch_add(count, Ordering::Relaxed);
91    }
92
93    /// Record messages consumed
94    pub fn record_messages_consumed(&self, count: u64) {
95        self.messages_consumed_total.fetch_add(count, Ordering::Relaxed);
96    }
97
98    /// Record topic created
99    pub fn record_topic_created(&self) {
100        self.topics_created_total.fetch_add(1, Ordering::Relaxed);
101    }
102
103    /// Record topic deleted
104    pub fn record_topic_deleted(&self) {
105        self.topics_deleted_total.fetch_add(1, Ordering::Relaxed);
106    }
107
108    /// Record consumer group created
109    pub fn record_consumer_group_created(&self) {
110        self.consumer_groups_total.fetch_add(1, Ordering::Relaxed);
111    }
112
113    /// Record partition created
114    pub fn record_partition_created(&self) {
115        self.partitions_total.fetch_add(1, Ordering::Relaxed);
116    }
117
118    /// Record request latency
119    pub fn record_request_latency(&self, latency_micros: u64) {
120        // Simple moving average - in production, you'd want more sophisticated tracking
121        let current = self.request_latency_micros.load(Ordering::Relaxed);
122        let new_avg = (current + latency_micros) / 2;
123        self.request_latency_micros.store(new_avg, Ordering::Relaxed);
124    }
125
126    /// Record an error
127    pub fn record_error(&self) {
128        self.errors_total.fetch_add(1, Ordering::Relaxed);
129    }
130
131    /// Get metrics snapshot
132    pub fn snapshot(&self) -> MetricsSnapshot {
133        MetricsSnapshot {
134            connections_total: self.connections_total.load(Ordering::Relaxed),
135            connections_active: self.connections_active.load(Ordering::Relaxed),
136            requests_total: self.requests_total.load(Ordering::Relaxed),
137            responses_total: self.responses_total.load(Ordering::Relaxed),
138            messages_produced_total: self.messages_produced_total.load(Ordering::Relaxed),
139            messages_consumed_total: self.messages_consumed_total.load(Ordering::Relaxed),
140            topics_created_total: self.topics_created_total.load(Ordering::Relaxed),
141            topics_deleted_total: self.topics_deleted_total.load(Ordering::Relaxed),
142            consumer_groups_total: self.consumer_groups_total.load(Ordering::Relaxed),
143            partitions_total: self.partitions_total.load(Ordering::Relaxed),
144            avg_request_latency_micros: self.request_latency_micros.load(Ordering::Relaxed),
145            errors_total: self.errors_total.load(Ordering::Relaxed),
146        }
147    }
148}
149
150/// Snapshot of metrics at a point in time
151#[derive(Debug, Clone)]
152pub struct MetricsSnapshot {
153    pub connections_total: u64,
154    pub connections_active: u64,
155    pub requests_total: u64,
156    pub responses_total: u64,
157    pub messages_produced_total: u64,
158    pub messages_consumed_total: u64,
159    pub topics_created_total: u64,
160    pub topics_deleted_total: u64,
161    pub consumer_groups_total: u64,
162    pub partitions_total: u64,
163    pub avg_request_latency_micros: u64,
164    pub errors_total: u64,
165}
166
167impl Default for KafkaMetrics {
168    fn default() -> Self {
169        Self::new()
170    }
171}
172
173/// Metrics exporter for Prometheus-style metrics
174pub struct MetricsExporter {
175    metrics: Arc<KafkaMetrics>,
176}
177
178impl MetricsExporter {
179    /// Create a new metrics exporter
180    pub fn new(metrics: Arc<KafkaMetrics>) -> Self {
181        Self { metrics }
182    }
183
184    /// Export metrics in Prometheus format
185    pub fn export_prometheus(&self) -> String {
186        let snapshot = self.metrics.snapshot();
187
188        format!(
189            "# HELP kafka_connections_total Total number of connections\n\
190             # TYPE kafka_connections_total counter\n\
191             kafka_connections_total {}\n\
192             # HELP kafka_connections_active Number of active connections\n\
193             # TYPE kafka_connections_active gauge\n\
194             kafka_connections_active {}\n\
195             # HELP kafka_requests_total Total number of requests\n\
196             # TYPE kafka_requests_total counter\n\
197             kafka_requests_total {}\n\
198             # HELP kafka_responses_total Total number of responses\n\
199             # TYPE kafka_responses_total counter\n\
200             kafka_responses_total {}\n\
201             # HELP kafka_messages_produced_total Total messages produced\n\
202             # TYPE kafka_messages_produced_total counter\n\
203             kafka_messages_produced_total {}\n\
204             # HELP kafka_messages_consumed_total Total messages consumed\n\
205             # TYPE kafka_messages_consumed_total counter\n\
206             kafka_messages_consumed_total {}\n\
207             # HELP kafka_topics_created_total Total topics created\n\
208             # TYPE kafka_topics_created_total counter\n\
209             kafka_topics_created_total {}\n\
210             # HELP kafka_topics_deleted_total Total topics deleted\n\
211             # TYPE kafka_topics_deleted_total counter\n\
212             kafka_topics_deleted_total {}\n\
213             # HELP kafka_consumer_groups_total Total consumer groups\n\
214             # TYPE kafka_consumer_groups_total gauge\n\
215             kafka_consumer_groups_total {}\n\
216             # HELP kafka_partitions_total Total partitions\n\
217             # TYPE kafka_partitions_total gauge\n\
218             kafka_partitions_total {}\n\
219             # HELP kafka_request_latency_micros_avg Average request latency in microseconds\n\
220             # TYPE kafka_request_latency_micros_avg gauge\n\
221             kafka_request_latency_micros_avg {}\n\
222             # HELP kafka_errors_total Total errors\n\
223             # TYPE kafka_errors_total counter\n\
224             kafka_errors_total {}\n",
225            snapshot.connections_total,
226            snapshot.connections_active,
227            snapshot.requests_total,
228            snapshot.responses_total,
229            snapshot.messages_produced_total,
230            snapshot.messages_consumed_total,
231            snapshot.topics_created_total,
232            snapshot.topics_deleted_total,
233            snapshot.consumer_groups_total,
234            snapshot.partitions_total,
235            snapshot.avg_request_latency_micros,
236            snapshot.errors_total
237        )
238    }
239}