rag_plusplus_core/observability/
metrics.rs

1//! Metrics Module
2//!
3//! Prometheus-compatible metrics for RAG++ operations.
4
5use metrics::{counter, gauge, histogram};
6use std::sync::atomic::{AtomicU64, Ordering};
7use std::time::{Duration, Instant};
8
9/// Metrics configuration.
10#[derive(Debug, Clone)]
11pub struct MetricsConfig {
12    /// Prefix for all metric names
13    pub prefix: String,
14    /// Enable detailed per-index metrics
15    pub per_index_metrics: bool,
16    /// Histogram buckets for latency (in seconds)
17    pub latency_buckets: Vec<f64>,
18}
19
20impl Default for MetricsConfig {
21    fn default() -> Self {
22        Self {
23            prefix: "ragpp".into(),
24            per_index_metrics: true,
25            latency_buckets: vec![
26                0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
27            ],
28        }
29    }
30}
31
32impl MetricsConfig {
33    /// Create new config with defaults.
34    #[must_use]
35    pub fn new() -> Self {
36        Self::default()
37    }
38
39    /// Set prefix.
40    #[must_use]
41    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
42        self.prefix = prefix.into();
43        self
44    }
45}
46
47/// Central metrics collector for RAG++.
48///
49/// Provides counters, gauges, and histograms for monitoring.
50///
51/// # Metrics
52///
53/// - `ragpp_queries_total` - Total queries processed
54/// - `ragpp_query_latency_seconds` - Query latency histogram
55/// - `ragpp_index_size` - Number of vectors per index
56/// - `ragpp_cache_hits_total` - Cache hit count
57/// - `ragpp_cache_misses_total` - Cache miss count
58/// - `ragpp_wal_writes_total` - WAL write count
59/// - `ragpp_buffer_flushes_total` - Buffer flush count
60#[derive(Debug)]
61pub struct Metrics {
62    config: MetricsConfig,
63    // Counters (using atomics for thread-safety without locking)
64    queries_total: AtomicU64,
65    cache_hits: AtomicU64,
66    cache_misses: AtomicU64,
67    wal_writes: AtomicU64,
68    buffer_flushes: AtomicU64,
69    errors_total: AtomicU64,
70}
71
72impl Metrics {
73    /// Create new metrics collector.
74    #[must_use]
75    pub fn new(config: MetricsConfig) -> Self {
76        Self {
77            config,
78            queries_total: AtomicU64::new(0),
79            cache_hits: AtomicU64::new(0),
80            cache_misses: AtomicU64::new(0),
81            wal_writes: AtomicU64::new(0),
82            buffer_flushes: AtomicU64::new(0),
83            errors_total: AtomicU64::new(0),
84        }
85    }
86
87    /// Create with default config.
88    #[must_use]
89    pub fn default_metrics() -> Self {
90        Self::new(MetricsConfig::default())
91    }
92
93    /// Record a query.
94    pub fn record_query(&self, latency: Duration, result_count: usize, index_name: Option<&str>) {
95        self.queries_total.fetch_add(1, Ordering::Relaxed);
96
97        let latency_secs = latency.as_secs_f64();
98        let prefix = &self.config.prefix;
99
100        // Record to metrics crate
101        if let Some(name) = index_name {
102            histogram!(format!("{prefix}_query_latency_seconds"), "index" => name.to_string())
103                .record(latency_secs);
104            counter!(format!("{prefix}_queries_total"), "index" => name.to_string())
105                .increment(1);
106            gauge!(format!("{prefix}_query_results"), "index" => name.to_string())
107                .set(result_count as f64);
108        } else {
109            histogram!(format!("{prefix}_query_latency_seconds"))
110                .record(latency_secs);
111            counter!(format!("{prefix}_queries_total"))
112                .increment(1);
113            gauge!(format!("{prefix}_query_results"))
114                .set(result_count as f64);
115        }
116    }
117
118    /// Record a cache hit.
119    pub fn record_cache_hit(&self) {
120        self.cache_hits.fetch_add(1, Ordering::Relaxed);
121        counter!(format!("{}_cache_hits_total", self.config.prefix)).increment(1);
122    }
123
124    /// Record a cache miss.
125    pub fn record_cache_miss(&self) {
126        self.cache_misses.fetch_add(1, Ordering::Relaxed);
127        counter!(format!("{}_cache_misses_total", self.config.prefix)).increment(1);
128    }
129
130    /// Record a WAL write.
131    pub fn record_wal_write(&self) {
132        self.wal_writes.fetch_add(1, Ordering::Relaxed);
133        counter!(format!("{}_wal_writes_total", self.config.prefix)).increment(1);
134    }
135
136    /// Record a buffer flush.
137    pub fn record_buffer_flush(&self, records_flushed: usize) {
138        self.buffer_flushes.fetch_add(1, Ordering::Relaxed);
139        counter!(format!("{}_buffer_flushes_total", self.config.prefix)).increment(1);
140        counter!(format!("{}_records_flushed_total", self.config.prefix))
141            .increment(records_flushed as u64);
142    }
143
144    /// Record an error.
145    pub fn record_error(&self, error_type: &str) {
146        self.errors_total.fetch_add(1, Ordering::Relaxed);
147        counter!(
148            format!("{}_errors_total", self.config.prefix),
149            "type" => error_type.to_string()
150        )
151        .increment(1);
152    }
153
154    /// Update index size gauge.
155    pub fn set_index_size(&self, index_name: &str, size: usize) {
156        gauge!(
157            format!("{}_index_size", self.config.prefix),
158            "index" => index_name.to_string()
159        )
160        .set(size as f64);
161    }
162
163    /// Update store size gauge.
164    pub fn set_store_size(&self, size: usize) {
165        gauge!(format!("{}_store_size", self.config.prefix)).set(size as f64);
166    }
167
168    /// Update memory usage gauge.
169    pub fn set_memory_bytes(&self, bytes: usize) {
170        gauge!(format!("{}_memory_bytes", self.config.prefix)).set(bytes as f64);
171    }
172
173    /// Get snapshot of current metrics.
174    #[must_use]
175    pub fn snapshot(&self) -> MetricsSnapshot {
176        MetricsSnapshot {
177            queries_total: self.queries_total.load(Ordering::Relaxed),
178            cache_hits: self.cache_hits.load(Ordering::Relaxed),
179            cache_misses: self.cache_misses.load(Ordering::Relaxed),
180            wal_writes: self.wal_writes.load(Ordering::Relaxed),
181            buffer_flushes: self.buffer_flushes.load(Ordering::Relaxed),
182            errors_total: self.errors_total.load(Ordering::Relaxed),
183        }
184    }
185
186    /// Calculate cache hit ratio.
187    #[must_use]
188    pub fn cache_hit_ratio(&self) -> f64 {
189        let hits = self.cache_hits.load(Ordering::Relaxed);
190        let misses = self.cache_misses.load(Ordering::Relaxed);
191        let total = hits + misses;
192
193        if total == 0 {
194            0.0
195        } else {
196            hits as f64 / total as f64
197        }
198    }
199}
200
201impl Default for Metrics {
202    fn default() -> Self {
203        Self::default_metrics()
204    }
205}
206
207/// Snapshot of metrics at a point in time.
208#[derive(Debug, Clone, Default)]
209pub struct MetricsSnapshot {
210    /// Total queries
211    pub queries_total: u64,
212    /// Cache hits
213    pub cache_hits: u64,
214    /// Cache misses
215    pub cache_misses: u64,
216    /// WAL writes
217    pub wal_writes: u64,
218    /// Buffer flushes
219    pub buffer_flushes: u64,
220    /// Errors
221    pub errors_total: u64,
222}
223
224impl MetricsSnapshot {
225    /// Calculate cache hit ratio.
226    #[must_use]
227    pub fn cache_hit_ratio(&self) -> f64 {
228        let total = self.cache_hits + self.cache_misses;
229        if total == 0 {
230            0.0
231        } else {
232            self.cache_hits as f64 / total as f64
233        }
234    }
235}
236
237/// Timer for measuring operation duration.
238#[allow(dead_code)]
239pub struct Timer {
240    start: Instant,
241}
242
243#[allow(dead_code)]
244impl Timer {
245    /// Start a new timer.
246    #[must_use]
247    pub fn start() -> Self {
248        Self {
249            start: Instant::now(),
250        }
251    }
252
253    /// Get elapsed duration.
254    #[must_use]
255    pub fn elapsed(&self) -> Duration {
256        self.start.elapsed()
257    }
258
259    /// Stop timer and return duration.
260    #[must_use]
261    pub fn stop(self) -> Duration {
262        self.elapsed()
263    }
264}
265
266#[cfg(test)]
267mod tests {
268    use super::*;
269
270    #[test]
271    fn test_metrics_creation() {
272        let metrics = Metrics::default_metrics();
273        let snapshot = metrics.snapshot();
274        assert_eq!(snapshot.queries_total, 0);
275    }
276
277    #[test]
278    fn test_record_query() {
279        let metrics = Metrics::default_metrics();
280
281        metrics.record_query(Duration::from_millis(50), 10, Some("test"));
282        metrics.record_query(Duration::from_millis(100), 5, None);
283
284        let snapshot = metrics.snapshot();
285        assert_eq!(snapshot.queries_total, 2);
286    }
287
288    #[test]
289    fn test_cache_metrics() {
290        let metrics = Metrics::default_metrics();
291
292        metrics.record_cache_hit();
293        metrics.record_cache_hit();
294        metrics.record_cache_miss();
295
296        let snapshot = metrics.snapshot();
297        assert_eq!(snapshot.cache_hits, 2);
298        assert_eq!(snapshot.cache_misses, 1);
299        assert!((metrics.cache_hit_ratio() - 0.666).abs() < 0.01);
300    }
301
302    #[test]
303    fn test_timer() {
304        let timer = Timer::start();
305        std::thread::sleep(Duration::from_millis(10));
306        let elapsed = timer.stop();
307
308        assert!(elapsed >= Duration::from_millis(10));
309    }
310
311    #[test]
312    fn test_config_builder() {
313        let config = MetricsConfig::new().with_prefix("myapp");
314        assert_eq!(config.prefix, "myapp");
315    }
316}