1#[cfg(feature = "metrics")]
32use std::sync::OnceLock;
33use std::time::{Duration, Instant};
34
35pub use metrics::{counter, gauge, histogram};
37
38#[cfg(feature = "metrics")]
39static METRICS_INITIALIZED: OnceLock<()> = OnceLock::new();
40
41#[cfg(feature = "metrics")]
56pub fn init_metrics(
57 addr: std::net::SocketAddr,
58) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
59 METRICS_INITIALIZED.get_or_init(
60 || match metrics_exporter_prometheus::PrometheusBuilder::new()
61 .with_http_listener(addr)
62 .install()
63 {
64 Ok(()) => {
65 tracing::info!(
66 "Prometheus metrics server listening on http://{}/metrics",
67 addr
68 );
69 }
70 Err(e) => {
71 tracing::error!("Failed to start Prometheus exporter: {}", e);
72 }
73 },
74 );
75 Ok(())
76}
77
78#[cfg(feature = "metrics")]
80pub fn init_metrics_recorder(
81) -> Result<metrics_exporter_prometheus::PrometheusHandle, Box<dyn std::error::Error + Send + Sync>>
82{
83 let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
84 let handle = builder.install_recorder()?;
85 Ok(handle)
86}
87
88#[cfg(not(feature = "metrics"))]
90pub fn init_metrics(
91 _addr: std::net::SocketAddr,
92) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
93 Ok(())
94}
95
96pub struct CoreMetrics;
102
103impl CoreMetrics {
104 pub fn increment_messages_appended() {
108 metrics::counter!("rivven_core_messages_appended_total").increment(1);
109 }
110
111 pub fn add_messages_appended(count: u64) {
113 metrics::counter!("rivven_core_messages_appended_total").increment(count);
114 }
115
116 pub fn add_messages_read(count: u64) {
118 metrics::counter!("rivven_core_messages_read_total").increment(count);
119 }
120
121 pub fn increment_batch_appends() {
123 metrics::counter!("rivven_core_batch_appends_total").increment(1);
124 }
125
126 pub fn increment_schemas_registered() {
128 metrics::counter!("rivven_core_schemas_registered_total").increment(1);
129 }
130
131 pub fn increment_schema_cache_hits() {
133 metrics::counter!("rivven_core_schema_cache_hits_total").increment(1);
134 }
135
136 pub fn increment_schema_cache_misses() {
138 metrics::counter!("rivven_core_schema_cache_misses_total").increment(1);
139 }
140
141 pub fn increment_auth_failures(auth_type: &str) {
145 metrics::counter!("rivven_core_auth_failures_total", "type" => auth_type.to_string())
146 .increment(1);
147 }
148
149 pub fn increment_rate_limit_rejections() {
151 metrics::counter!("rivven_core_rate_limit_rejections_total").increment(1);
152 }
153
154 pub fn increment_circuit_breaker_trips() {
156 metrics::counter!("rivven_core_circuit_breaker_trips_total").increment(1);
157 }
158
159 pub fn increment_sql_injection_blocked() {
161 metrics::counter!("rivven_core_sql_injection_blocked_total").increment(1);
162 }
163
164 pub fn increment_connection_timeouts() {
166 metrics::counter!("rivven_core_connection_timeouts_total").increment(1);
167 }
168
169 pub fn increment_message_size_exceeded() {
171 metrics::counter!("rivven_core_message_size_exceeded_total").increment(1);
172 }
173
174 pub fn set_active_connections(count: u64) {
178 metrics::gauge!("rivven_core_active_connections").set(count as f64);
179 }
180
181 pub fn set_partition_count(count: u64) {
183 metrics::gauge!("rivven_core_partition_count").set(count as f64);
184 }
185
186 pub fn set_segment_count(count: u64) {
188 metrics::gauge!("rivven_core_segment_count").set(count as f64);
189 }
190
191 pub fn set_partition_offset(topic: &str, partition: u32, offset: u64) {
193 metrics::gauge!(
194 "rivven_core_partition_offset",
195 "topic" => topic.to_string(),
196 "partition" => partition.to_string()
197 )
198 .set(offset as f64);
199 }
200
201 pub fn set_schema_cache_size(size: u64) {
203 metrics::gauge!("rivven_core_schema_cache_size").set(size as f64);
204 }
205
206 pub fn record_append_latency_us(us: u64) {
210 metrics::histogram!("rivven_core_append_latency_seconds").record(us as f64 / 1_000_000.0);
211 }
212
213 pub fn record_batch_append_latency_us(us: u64) {
215 metrics::histogram!("rivven_core_batch_append_latency_seconds")
216 .record(us as f64 / 1_000_000.0);
217 }
218
219 pub fn record_read_latency_us(us: u64) {
221 metrics::histogram!("rivven_core_read_latency_seconds").record(us as f64 / 1_000_000.0);
222 }
223
224 pub fn record_schema_lookup_latency_ns(ns: u64) {
226 metrics::histogram!("rivven_core_schema_lookup_latency_seconds")
227 .record(ns as f64 / 1_000_000_000.0);
228 }
229}
230
231pub struct Timer {
237 start: Instant,
238}
239
240impl Timer {
241 pub fn new() -> Self {
243 Self {
244 start: Instant::now(),
245 }
246 }
247
248 pub fn elapsed_us(&self) -> u64 {
250 self.start.elapsed().as_micros() as u64
251 }
252
253 pub fn elapsed_ns(&self) -> u64 {
255 self.start.elapsed().as_nanos() as u64
256 }
257
258 pub fn elapsed(&self) -> Duration {
260 self.start.elapsed()
261 }
262}
263
264impl Default for Timer {
265 fn default() -> Self {
266 Self::new()
267 }
268}
269
270#[cfg(test)]
275mod tests {
276 use super::*;
277
278 #[test]
279 fn test_core_metrics_compile() {
280 CoreMetrics::increment_messages_appended();
282 CoreMetrics::add_messages_appended(100);
283 CoreMetrics::add_messages_read(50);
284 CoreMetrics::increment_batch_appends();
285 CoreMetrics::increment_schemas_registered();
286 CoreMetrics::increment_schema_cache_hits();
287 CoreMetrics::increment_schema_cache_misses();
288 CoreMetrics::increment_auth_failures("md5");
289 CoreMetrics::increment_rate_limit_rejections();
290 CoreMetrics::increment_circuit_breaker_trips();
291 CoreMetrics::increment_sql_injection_blocked();
292 CoreMetrics::increment_connection_timeouts();
293 CoreMetrics::increment_message_size_exceeded();
294 CoreMetrics::set_active_connections(10);
295 CoreMetrics::set_partition_count(100);
296 CoreMetrics::set_segment_count(1000);
297 CoreMetrics::set_partition_offset("orders", 0, 12345);
298 CoreMetrics::set_schema_cache_size(50);
299 CoreMetrics::record_append_latency_us(100);
300 CoreMetrics::record_batch_append_latency_us(500);
301 CoreMetrics::record_read_latency_us(200);
302 CoreMetrics::record_schema_lookup_latency_ns(30);
303 }
304
305 #[test]
306 fn test_timer() {
307 let timer = Timer::new();
308 std::thread::sleep(std::time::Duration::from_millis(1));
309 assert!(timer.elapsed_us() >= 1000);
310 assert!(timer.elapsed_ns() >= 1_000_000);
311 }
312}