Skip to main content

rivven_core/
metrics.rs

1//! Unified metrics infrastructure for Rivven
2//!
3//! Uses the `metrics` crate facade for backend-agnostic instrumentation.
4//! The Prometheus exporter is enabled by the `metrics` feature flag.
5//!
6//! # Usage
7//!
8//! ```rust,ignore
9//! use rivven_core::metrics::{CoreMetrics, init_metrics};
10//! use std::net::SocketAddr;
11//!
12//! // Initialize at startup (optional - works without initialization too)
13//! let addr: SocketAddr = "0.0.0.0:9090".parse().unwrap();
14//! init_metrics(addr).expect("Failed to init metrics");
15//!
16//! // Record metrics anywhere
17//! CoreMetrics::increment_messages_appended();
18//! CoreMetrics::record_append_latency_us(150);
19//! CoreMetrics::set_partition_count(100);
20//! ```
21//!
22//! # Metric Naming Convention
23//!
24//! All metrics follow the pattern: `rivven_{component}_{name}_{unit}`
25//!
26//! - `rivven_core_*` - Storage engine metrics
27//! - `rivven_raft_*` - Consensus metrics (from rivven-cluster)
28//! - `rivven_cdc_*` - CDC connector metrics (from rivven-cdc)
29//! - `rivven_connect_*` - Connector framework metrics
30
31#[cfg(feature = "metrics")]
32use std::sync::OnceLock;
33use std::time::{Duration, Instant};
34
35// Re-export metrics macros for convenience
36pub use metrics::{counter, gauge, histogram};
37
38#[cfg(feature = "metrics")]
39static METRICS_INITIALIZED: OnceLock<()> = OnceLock::new();
40
41/// Initialize the Prometheus metrics exporter
42///
43/// This starts an HTTP server that serves metrics at `/metrics`.
44/// Safe to call multiple times (only initializes once).
45///
46/// # Example
47///
48/// ```rust,ignore
49/// use std::net::SocketAddr;
50/// use rivven_core::metrics::init_metrics;
51///
52/// let addr: SocketAddr = "0.0.0.0:9090".parse().unwrap();
53/// init_metrics(addr).expect("Failed to start metrics server");
54/// ```
55#[cfg(feature = "metrics")]
56pub fn init_metrics(
57    addr: std::net::SocketAddr,
58) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
59    METRICS_INITIALIZED.get_or_init(
60        || match metrics_exporter_prometheus::PrometheusBuilder::new()
61            .with_http_listener(addr)
62            .install()
63        {
64            Ok(()) => {
65                tracing::info!(
66                    "Prometheus metrics server listening on http://{}/metrics",
67                    addr
68                );
69            }
70            Err(e) => {
71                tracing::error!("Failed to start Prometheus exporter: {}", e);
72            }
73        },
74    );
75    Ok(())
76}
77
78/// Initialize metrics without starting a server (for embedding in existing HTTP server)
79#[cfg(feature = "metrics")]
80pub fn init_metrics_recorder(
81) -> Result<metrics_exporter_prometheus::PrometheusHandle, Box<dyn std::error::Error + Send + Sync>>
82{
83    let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
84    let handle = builder.install_recorder()?;
85    Ok(handle)
86}
87
88/// No-op initialization when metrics feature is disabled
89#[cfg(not(feature = "metrics"))]
90pub fn init_metrics(
91    _addr: std::net::SocketAddr,
92) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
93    Ok(())
94}
95
96// ============================================================================
97// Core Storage Metrics
98// ============================================================================
99
100/// Core storage engine metrics
101pub struct CoreMetrics;
102
103impl CoreMetrics {
104    // ---- Counters ----
105
106    /// Total messages appended to partitions
107    pub fn increment_messages_appended() {
108        metrics::counter!("rivven_core_messages_appended_total").increment(1);
109    }
110
111    /// Add multiple messages to the counter
112    pub fn add_messages_appended(count: u64) {
113        metrics::counter!("rivven_core_messages_appended_total").increment(count);
114    }
115
116    /// Total messages read from partitions
117    pub fn add_messages_read(count: u64) {
118        metrics::counter!("rivven_core_messages_read_total").increment(count);
119    }
120
121    /// Total batch append operations
122    pub fn increment_batch_appends() {
123        metrics::counter!("rivven_core_batch_appends_total").increment(1);
124    }
125
126    /// Schema registered
127    pub fn increment_schemas_registered() {
128        metrics::counter!("rivven_core_schemas_registered_total").increment(1);
129    }
130
131    /// Schema cache hits
132    pub fn increment_schema_cache_hits() {
133        metrics::counter!("rivven_core_schema_cache_hits_total").increment(1);
134    }
135
136    /// Schema cache misses
137    pub fn increment_schema_cache_misses() {
138        metrics::counter!("rivven_core_schema_cache_misses_total").increment(1);
139    }
140
141    // ---- Security Counters ----
142
143    /// Authentication failures by type
144    pub fn increment_auth_failures(auth_type: &str) {
145        metrics::counter!("rivven_core_auth_failures_total", "type" => auth_type.to_string())
146            .increment(1);
147    }
148
149    /// Rate limit rejections
150    pub fn increment_rate_limit_rejections() {
151        metrics::counter!("rivven_core_rate_limit_rejections_total").increment(1);
152    }
153
154    /// Circuit breaker trips
155    pub fn increment_circuit_breaker_trips() {
156        metrics::counter!("rivven_core_circuit_breaker_trips_total").increment(1);
157    }
158
159    /// SQL injection attempts blocked
160    pub fn increment_sql_injection_blocked() {
161        metrics::counter!("rivven_core_sql_injection_blocked_total").increment(1);
162    }
163
164    /// Connection timeouts
165    pub fn increment_connection_timeouts() {
166        metrics::counter!("rivven_core_connection_timeouts_total").increment(1);
167    }
168
169    /// Message size limit exceeded
170    pub fn increment_message_size_exceeded() {
171        metrics::counter!("rivven_core_message_size_exceeded_total").increment(1);
172    }
173
174    // ---- Gauges ----
175
176    /// Active connections
177    pub fn set_active_connections(count: u64) {
178        metrics::gauge!("rivven_core_active_connections").set(count as f64);
179    }
180
181    /// Total partitions
182    pub fn set_partition_count(count: u64) {
183        metrics::gauge!("rivven_core_partition_count").set(count as f64);
184    }
185
186    /// Total segments
187    pub fn set_segment_count(count: u64) {
188        metrics::gauge!("rivven_core_segment_count").set(count as f64);
189    }
190
191    /// Partition offset (per topic/partition)
192    pub fn set_partition_offset(topic: &str, partition: u32, offset: u64) {
193        metrics::gauge!(
194            "rivven_core_partition_offset",
195            "topic" => topic.to_string(),
196            "partition" => partition.to_string()
197        )
198        .set(offset as f64);
199    }
200
201    /// Schema cache size
202    pub fn set_schema_cache_size(size: u64) {
203        metrics::gauge!("rivven_core_schema_cache_size").set(size as f64);
204    }
205
206    // ---- Histograms ----
207
208    /// Record append latency in microseconds
209    pub fn record_append_latency_us(us: u64) {
210        metrics::histogram!("rivven_core_append_latency_seconds").record(us as f64 / 1_000_000.0);
211    }
212
213    /// Record batch append latency in microseconds
214    pub fn record_batch_append_latency_us(us: u64) {
215        metrics::histogram!("rivven_core_batch_append_latency_seconds")
216            .record(us as f64 / 1_000_000.0);
217    }
218
219    /// Record read latency in microseconds
220    pub fn record_read_latency_us(us: u64) {
221        metrics::histogram!("rivven_core_read_latency_seconds").record(us as f64 / 1_000_000.0);
222    }
223
224    /// Record schema lookup latency in nanoseconds
225    pub fn record_schema_lookup_latency_ns(ns: u64) {
226        metrics::histogram!("rivven_core_schema_lookup_latency_seconds")
227            .record(ns as f64 / 1_000_000_000.0);
228    }
229}
230
231// ============================================================================
232// Timer Utility
233// ============================================================================
234
235/// Timer for measuring operation durations
236pub struct Timer {
237    start: Instant,
238}
239
240impl Timer {
241    /// Create a new timer starting now
242    pub fn new() -> Self {
243        Self {
244            start: Instant::now(),
245        }
246    }
247
248    /// Get elapsed time in microseconds
249    pub fn elapsed_us(&self) -> u64 {
250        self.start.elapsed().as_micros() as u64
251    }
252
253    /// Get elapsed time in nanoseconds
254    pub fn elapsed_ns(&self) -> u64 {
255        self.start.elapsed().as_nanos() as u64
256    }
257
258    /// Get elapsed Duration
259    pub fn elapsed(&self) -> Duration {
260        self.start.elapsed()
261    }
262}
263
264impl Default for Timer {
265    fn default() -> Self {
266        Self::new()
267    }
268}
269
270// ============================================================================
271// Tests
272// ============================================================================
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277
278    #[test]
279    fn test_core_metrics_compile() {
280        // Verify all metric methods compile and don't panic
281        CoreMetrics::increment_messages_appended();
282        CoreMetrics::add_messages_appended(100);
283        CoreMetrics::add_messages_read(50);
284        CoreMetrics::increment_batch_appends();
285        CoreMetrics::increment_schemas_registered();
286        CoreMetrics::increment_schema_cache_hits();
287        CoreMetrics::increment_schema_cache_misses();
288        CoreMetrics::increment_auth_failures("md5");
289        CoreMetrics::increment_rate_limit_rejections();
290        CoreMetrics::increment_circuit_breaker_trips();
291        CoreMetrics::increment_sql_injection_blocked();
292        CoreMetrics::increment_connection_timeouts();
293        CoreMetrics::increment_message_size_exceeded();
294        CoreMetrics::set_active_connections(10);
295        CoreMetrics::set_partition_count(100);
296        CoreMetrics::set_segment_count(1000);
297        CoreMetrics::set_partition_offset("orders", 0, 12345);
298        CoreMetrics::set_schema_cache_size(50);
299        CoreMetrics::record_append_latency_us(100);
300        CoreMetrics::record_batch_append_latency_us(500);
301        CoreMetrics::record_read_latency_us(200);
302        CoreMetrics::record_schema_lookup_latency_ns(30);
303    }
304
305    #[test]
306    fn test_timer() {
307        let timer = Timer::new();
308        std::thread::sleep(std::time::Duration::from_millis(1));
309        assert!(timer.elapsed_us() >= 1000);
310        assert!(timer.elapsed_ns() >= 1_000_000);
311    }
312}